diff --git a/docs/changelog/80104.yaml b/docs/changelog/80104.yaml new file mode 100644 index 0000000000000..6fe72ad7eb814 --- /dev/null +++ b/docs/changelog/80104.yaml @@ -0,0 +1,6 @@ +pr: 80104 +summary: Always compress cluster state on transport layer +area: Cluster Coordination +type: enhancement +issues: + - 79906 diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java index 39f73c34921b9..c0ad387c6fe03 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java @@ -19,12 +19,16 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.transport.AlwaysCompressedRequest; import java.io.IOException; import java.util.Arrays; import java.util.Map; -public class ClusterStateRequest extends MasterNodeReadRequest implements IndicesRequest.Replaceable { +public class ClusterStateRequest extends MasterNodeReadRequest + implements + IndicesRequest.Replaceable, + AlwaysCompressedRequest { public static final TimeValue DEFAULT_WAIT_FOR_NODE_TIMEOUT = TimeValue.timeValueMinutes(1); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ValidateJoinRequest.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ValidateJoinRequest.java index 688563984646c..0a24ed032fce2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ValidateJoinRequest.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ValidateJoinRequest.java @@ -10,12 +10,13 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.AlwaysCompressedRequest; import org.elasticsearch.transport.TransportRequest; import java.io.IOException; -public class ValidateJoinRequest extends TransportRequest { - private ClusterState state; +public class ValidateJoinRequest extends TransportRequest implements AlwaysCompressedRequest { + private final ClusterState state; public ValidateJoinRequest(StreamInput in) throws IOException { super(in); diff --git a/server/src/main/java/org/elasticsearch/transport/AlwaysCompressedRequest.java b/server/src/main/java/org/elasticsearch/transport/AlwaysCompressedRequest.java new file mode 100644 index 0000000000000..92fc51ca9bf87 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/AlwaysCompressedRequest.java @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.transport; + +/** + * A marker interface for requests that should always be compressed because they (or their responses) might get overwhelmingly large + * otherwise. + */ +public interface AlwaysCompressedRequest {} diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index 6b14b41630b8c..1053174c67218 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.BytesRef; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -131,7 +132,7 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st if (header.isError()) { handlerResponseError(streamInput, handler); } else { - handleResponse(remoteAddress, streamInput, handler); + handleResponse(remoteAddress, message.getHeader(), streamInput, handler); } // Check the entire message has been read final int nextByte = streamInput.read(); @@ -149,7 +150,7 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st } } else { assert header.isError() == false; - handleResponse(remoteAddress, EMPTY_STREAM_INPUT, handler); + handleResponse(remoteAddress, message.getHeader(), EMPTY_STREAM_INPUT, handler); } } } @@ -301,6 +302,7 @@ private static void sendErrorResponse(String actionName, TransportChannel transp private void handleResponse( InetSocketAddress remoteAddress, + final Header header, final StreamInput stream, final TransportResponseHandler handler ) { @@ -317,6 +319,7 @@ private void handleResponse( handleException(handler, serializationException); return; } + assert header.isCompressed() || (response instanceof ClusterStateResponse == false); final String executor = handler.executor(); if (ThreadPool.Names.SAME.equals(executor)) { doHandleResponse(handler, response); diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java b/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java index fdd823d430160..0af1bef0fd208 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java @@ -8,6 +8,7 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; +import org.elasticsearch.cluster.coordination.ValidateJoinRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -36,6 +37,7 @@ abstract class OutboundMessage extends NetworkMessage { ) { super(threadContext, version, status, requestId, compressionScheme); this.message = message; + assert isCompress() || (message instanceof ValidateJoinRequest == false); } BytesReference serialize(BytesStreamOutput bytesStream) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 6bed6c7fe6186..946b9dec27fdb 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -266,6 +266,7 @@ public void sendRequest(long requestId, String action, TransportRequest request, // is enabled and the request is a RawIndexingDataTransportRequest which indicates it should be // compressed. final boolean shouldCompress = compress == Compression.Enabled.TRUE + || (request instanceof AlwaysCompressedRequest) || (compress == Compression.Enabled.INDEXING_DATA && request instanceof RawIndexingDataTransportRequest && ((RawIndexingDataTransportRequest) request).isRawIndexingData());