Skip to content

Always compress cluster state on transport layer #80104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/80104.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 80104
summary: Always compress cluster state on transport layer
area: Cluster Coordination
type: enhancement
issues:
- 79906
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.AlwaysCompressedRequest;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;

public class ClusterStateRequest extends MasterNodeReadRequest<ClusterStateRequest> implements IndicesRequest.Replaceable {
public class ClusterStateRequest extends MasterNodeReadRequest<ClusterStateRequest>
implements
IndicesRequest.Replaceable,
AlwaysCompressedRequest {

public static final TimeValue DEFAULT_WAIT_FOR_NODE_TIMEOUT = TimeValue.timeValueMinutes(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.AlwaysCompressedRequest;
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;

public class ValidateJoinRequest extends TransportRequest {
private ClusterState state;
public class ValidateJoinRequest extends TransportRequest implements AlwaysCompressedRequest {
private final ClusterState state;

public ValidateJoinRequest(StreamInput in) throws IOException {
super(in);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.transport;

/**
* A marker interface for requests that should always be compressed because they (or their responses) might get overwhelmingly large
* otherwise.
*/
public interface AlwaysCompressedRequest {}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -131,7 +132,7 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st
if (header.isError()) {
handlerResponseError(streamInput, handler);
} else {
handleResponse(remoteAddress, streamInput, handler);
handleResponse(remoteAddress, message.getHeader(), streamInput, handler);
}
// Check the entire message has been read
final int nextByte = streamInput.read();
Expand All @@ -149,7 +150,7 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st
}
} else {
assert header.isError() == false;
handleResponse(remoteAddress, EMPTY_STREAM_INPUT, handler);
handleResponse(remoteAddress, message.getHeader(), EMPTY_STREAM_INPUT, handler);
}
}
}
Expand Down Expand Up @@ -301,6 +302,7 @@ private static void sendErrorResponse(String actionName, TransportChannel transp

private <T extends TransportResponse> void handleResponse(
InetSocketAddress remoteAddress,
final Header header,
final StreamInput stream,
final TransportResponseHandler<T> handler
) {
Expand All @@ -317,6 +319,7 @@ private <T extends TransportResponse> void handleResponse(
handleException(handler, serializationException);
return;
}
assert header.isCompressed() || (response instanceof ClusterStateResponse == false);
final String executor = handler.executor();
if (ThreadPool.Names.SAME.equals(executor)) {
doHandleResponse(handler, response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.transport;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.coordination.ValidateJoinRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -36,6 +37,7 @@ abstract class OutboundMessage extends NetworkMessage {
) {
super(threadContext, version, status, requestId, compressionScheme);
this.message = message;
assert isCompress() || (message instanceof ValidateJoinRequest == false);
}

BytesReference serialize(BytesStreamOutput bytesStream) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ public void sendRequest(long requestId, String action, TransportRequest request,
// is enabled and the request is a RawIndexingDataTransportRequest which indicates it should be
// compressed.
final boolean shouldCompress = compress == Compression.Enabled.TRUE
|| (request instanceof AlwaysCompressedRequest)
|| (compress == Compression.Enabled.INDEXING_DATA
&& request instanceof RawIndexingDataTransportRequest
&& ((RawIndexingDataTransportRequest) request).isRawIndexingData());
Expand Down