Skip to content

Commit 0645ee8

Browse files
authored
Send cluster name and discovery node in handshake (#48916)
This commits sends the cluster name and discovery naode in the transport level handshake response. This will allow us to stop sending the transport service level handshake request in the 8.0-8.x release cycle. It is necessary to start sending this in 7.x so that 8.0 is guaranteed to be communicating with a version that sends the required information.
1 parent c320b49 commit 0645ee8

File tree

10 files changed

+136
-34
lines changed

10 files changed

+136
-34
lines changed

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.Version;
2828
import org.elasticsearch.action.ActionListener;
2929
import org.elasticsearch.action.support.ThreadedActionListener;
30+
import org.elasticsearch.cluster.ClusterName;
3031
import org.elasticsearch.cluster.node.DiscoveryNode;
3132
import org.elasticsearch.common.Booleans;
3233
import org.elasticsearch.common.Strings;
@@ -151,7 +152,7 @@ public TcpTransport(Settings settings, Version version, ThreadPool threadPool, P
151152
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.IN_FLIGHT_REQUESTS);
152153

153154
this.outboundHandler = new OutboundHandler(nodeName, version, features, threadPool, bigArrays);
154-
this.handshaker = new TransportHandshaker(version, threadPool,
155+
this.handshaker = new TransportHandshaker(ClusterName.CLUSTER_NAME_SETTING.get(settings), version, threadPool,
155156
(node, channel, requestId, v) -> outboundHandler.sendRequest(node, channel, requestId,
156157
TransportHandshaker.HANDSHAKE_ACTION_NAME, new TransportHandshaker.HandshakeRequest(version),
157158
TransportRequestOptions.EMPTY, v, false, true),
@@ -167,6 +168,11 @@ public TcpTransport(Settings settings, Version version, ThreadPool threadPool, P
167168
protected void doStart() {
168169
}
169170

171+
@Override
172+
public void setLocalNode(DiscoveryNode localNode) {
173+
handshaker.setLocalNode(localNode);
174+
}
175+
170176
@Override
171177
public synchronized void setMessageListener(TransportMessageListener listener) {
172178
outboundHandler.setMessageListener(listener);

server/src/main/java/org/elasticsearch/transport/Transport.java

+2
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public interface Transport extends LifecycleComponent {
5252

5353
void setMessageListener(TransportMessageListener listener);
5454

55+
void setLocalNode(DiscoveryNode localNode);
56+
5557
/**
5658
* The address the transport is bound on.
5759
*/

server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java

+58-14
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.elasticsearch.Version;
2222
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.cluster.ClusterName;
2324
import org.elasticsearch.cluster.node.DiscoveryNode;
2425
import org.elasticsearch.common.bytes.BytesReference;
2526
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -46,19 +47,26 @@ final class TransportHandshaker {
4647
private final ConcurrentMap<Long, HandshakeResponseHandler> pendingHandshakes = new ConcurrentHashMap<>();
4748
private final CounterMetric numHandshakes = new CounterMetric();
4849

50+
private final ClusterName clusterName;
4951
private final Version version;
5052
private final ThreadPool threadPool;
5153
private final HandshakeRequestSender handshakeRequestSender;
5254
private final HandshakeResponseSender handshakeResponseSender;
55+
private volatile DiscoveryNode localNode;
5356

54-
TransportHandshaker(Version version, ThreadPool threadPool, HandshakeRequestSender handshakeRequestSender,
57+
TransportHandshaker(ClusterName clusterName, Version version, ThreadPool threadPool, HandshakeRequestSender handshakeRequestSender,
5558
HandshakeResponseSender handshakeResponseSender) {
59+
this.clusterName = clusterName;
5660
this.version = version;
5761
this.threadPool = threadPool;
5862
this.handshakeRequestSender = handshakeRequestSender;
5963
this.handshakeResponseSender = handshakeResponseSender;
6064
}
6165

66+
void setLocalNode(DiscoveryNode localNode) {
67+
this.localNode = localNode;
68+
}
69+
6270
void sendHandshake(long requestId, DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener<Version> listener) {
6371
numHandshakes.inc();
6472
final HandshakeResponseHandler handler = new HandshakeResponseHandler(requestId, version, listener);
@@ -89,14 +97,17 @@ void sendHandshake(long requestId, DiscoveryNode node, TcpChannel channel, TimeV
8997
}
9098

9199
void handleHandshake(Version version, Set<String> features, TcpChannel channel, long requestId, StreamInput stream) throws IOException {
100+
// The TransportService blocks incoming requests until this has been set.
101+
assert localNode != null : "Local node must be set before handshake is handled";
102+
92103
// Must read the handshake request to exhaust the stream
93104
HandshakeRequest handshakeRequest = new HandshakeRequest(stream);
94105
final int nextByte = stream.read();
95106
if (nextByte != -1) {
96107
throw new IllegalStateException("Handshake request not fully read for requestId [" + requestId + "], action ["
97108
+ TransportHandshaker.HANDSHAKE_ACTION_NAME + "], available [" + stream.available() + "]; resetting");
98109
}
99-
HandshakeResponse response = new HandshakeResponse(this.version);
110+
HandshakeResponse response = new HandshakeResponse(handshakeRequest.version, this.version, this.clusterName, this.localNode);
100111
handshakeResponseSender.sendResponse(version, features, channel, response, requestId);
101112
}
102113

@@ -127,13 +138,13 @@ private HandshakeResponseHandler(long requestId, Version currentVersion, ActionL
127138

128139
@Override
129140
public HandshakeResponse read(StreamInput in) throws IOException {
130-
return new HandshakeResponse(in);
141+
return new HandshakeResponse(this.currentVersion, in);
131142
}
132143

133144
@Override
134145
public void handleResponse(HandshakeResponse response) {
135146
if (isDone.compareAndSet(false, true)) {
136-
Version version = response.responseVersion;
147+
Version version = response.version;
137148
if (currentVersion.isCompatible(version) == false) {
138149
listener.onFailure(new IllegalStateException("Received message from unsupported version: [" + version
139150
+ "] minimal compatible version is: [" + currentVersion.minimumCompatibilityVersion() + "]"));
@@ -201,25 +212,58 @@ public void writeTo(StreamOutput streamOutput) throws IOException {
201212

202213
static final class HandshakeResponse extends TransportResponse {
203214

204-
private final Version responseVersion;
205-
206-
HandshakeResponse(Version responseVersion) {
207-
this.responseVersion = responseVersion;
215+
private final Version requestVersion;
216+
private final Version version;
217+
private final ClusterName clusterName;
218+
private final DiscoveryNode discoveryNode;
219+
220+
HandshakeResponse(Version requestVersion, Version responseVersion, ClusterName clusterName, DiscoveryNode discoveryNode) {
221+
this.requestVersion = requestVersion;
222+
this.version = responseVersion;
223+
this.clusterName = clusterName;
224+
this.discoveryNode = discoveryNode;
208225
}
209226

210-
private HandshakeResponse(StreamInput in) throws IOException {
227+
private HandshakeResponse(Version requestVersion, StreamInput in) throws IOException {
211228
super(in);
212-
responseVersion = Version.readVersion(in);
229+
this.requestVersion = requestVersion;
230+
version = Version.readVersion(in);
231+
// During the handshake process, nodes set their stream version to the minimum compatibility
232+
// version they support. When deserializing the response, we use the version the other node
233+
// told us that it actually is in the handshake response (`version`).
234+
if (requestVersion.onOrAfter(Version.V_7_6_0) && version.onOrAfter(Version.V_7_6_0)) {
235+
clusterName = new ClusterName(in);
236+
discoveryNode = new DiscoveryNode(in);
237+
} else {
238+
clusterName = null;
239+
discoveryNode = null;
240+
}
213241
}
214242

215243
@Override
216244
public void writeTo(StreamOutput out) throws IOException {
217-
assert responseVersion != null;
218-
Version.writeVersion(responseVersion, out);
245+
assert version != null;
246+
Version.writeVersion(version, out);
247+
// During the handshake process, nodes set their stream version to the minimum compatibility
248+
// version they support. When deciding what response to send, we use the version the other node
249+
// told us that it actually is in the handshake request (`requestVersion`). If it did not tell
250+
// us a `requestVersion`, it is at least a pre-7.6 node.
251+
if (requestVersion != null && requestVersion.onOrAfter(Version.V_7_6_0) && version.onOrAfter(Version.V_7_6_0)) {
252+
clusterName.writeTo(out);
253+
discoveryNode.writeTo(out);
254+
}
255+
}
256+
257+
Version getVersion() {
258+
return version;
259+
}
260+
261+
ClusterName getClusterName() {
262+
return clusterName;
219263
}
220264

221-
Version getResponseVersion() {
222-
return responseVersion;
265+
DiscoveryNode getDiscoveryNode() {
266+
return discoveryNode;
223267
}
224268
}
225269

server/src/main/java/org/elasticsearch/transport/TransportService.java

+1
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ protected void doStart() {
235235
}
236236
}
237237
localNode = localNodeFactory.apply(transport.boundAddress());
238+
transport.setLocalNode(localNode);
238239

239240
if (connectToRemoteCluster) {
240241
// here we start to connect to the remote clusters

server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java

+5
Original file line numberDiff line numberDiff line change
@@ -229,4 +229,9 @@ public RequestHandlerRegistry getRequestHandler(String action) {
229229
public void setMessageListener(TransportMessageListener listener) {
230230
this.listener = listener;
231231
}
232+
233+
@Override
234+
public void setLocalNode(DiscoveryNode localNode) {
235+
236+
}
232237
}

server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java

+4
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,10 @@ public RequestHandlerRegistry getRequestHandler(String action) {
415415
public void setMessageListener(TransportMessageListener listener) {
416416
}
417417

418+
@Override
419+
public void setLocalNode(DiscoveryNode localNode) {
420+
}
421+
418422
@Override
419423
public BoundTransportAddress boundAddress() {
420424
return null;

server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.ElasticsearchException;
2323
import org.elasticsearch.Version;
24+
import org.elasticsearch.cluster.ClusterName;
2425
import org.elasticsearch.common.bytes.BytesArray;
2526
import org.elasticsearch.common.bytes.BytesReference;
2627
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -58,9 +59,8 @@ public void setUp() throws Exception {
5859
channel = new FakeTcpChannel(randomBoolean(), buildNewFakeTransportAddress().address(), buildNewFakeTransportAddress().address());
5960
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
6061
InboundMessage.Reader reader = new InboundMessage.Reader(version, namedWriteableRegistry, threadPool.getThreadContext());
61-
TransportHandshaker handshaker = new TransportHandshaker(version, threadPool, (n, c, r, v) -> {
62-
}, (v, f, c, r, r_id) -> {
63-
});
62+
TransportHandshaker handshaker = new TransportHandshaker(new ClusterName("cluster-name"), version, threadPool, (n, c, r, v) -> {
63+
}, (v, f, c, r, r_id) -> {});
6464
TransportKeepAlive keepAlive = new TransportKeepAlive(threadPool, TcpChannel::sendMessage);
6565
OutboundHandler outboundHandler =
6666
new OutboundHandler("node", version, new String[0], threadPool, BigArrays.NON_RECYCLING_INSTANCE);

server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java

+47-16
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,20 @@
2020

2121
import org.elasticsearch.Version;
2222
import org.elasticsearch.action.support.PlainActionFuture;
23+
import org.elasticsearch.cluster.ClusterName;
2324
import org.elasticsearch.cluster.node.DiscoveryNode;
25+
import org.elasticsearch.common.SuppressForbidden;
2426
import org.elasticsearch.common.io.stream.BytesStreamOutput;
2527
import org.elasticsearch.common.io.stream.StreamInput;
28+
import org.elasticsearch.common.transport.TransportAddress;
2629
import org.elasticsearch.common.unit.TimeValue;
2730
import org.elasticsearch.tasks.TaskId;
2831
import org.elasticsearch.test.ESTestCase;
2932
import org.elasticsearch.threadpool.TestThreadPool;
3033
import org.mockito.ArgumentCaptor;
3134

3235
import java.io.IOException;
36+
import java.net.InetAddress;
3337
import java.util.Collections;
3438
import java.util.concurrent.TimeUnit;
3539

@@ -42,23 +46,29 @@
4246
public class TransportHandshakerTests extends ESTestCase {
4347

4448
private TransportHandshaker handshaker;
45-
private DiscoveryNode node;
49+
private DiscoveryNode remoteNode;
4650
private TcpChannel channel;
4751
private TestThreadPool threadPool;
4852
private TransportHandshaker.HandshakeRequestSender requestSender;
4953
private TransportHandshaker.HandshakeResponseSender responseSender;
54+
private ClusterName clusterName;
55+
private DiscoveryNode localNode;
5056

5157
@Override
58+
@SuppressForbidden(reason = "Allow accessing localhost")
5259
public void setUp() throws Exception {
5360
super.setUp();
54-
String nodeId = "node-id";
61+
String nodeId = "remote-node-id";
5562
channel = mock(TcpChannel.class);
5663
requestSender = mock(TransportHandshaker.HandshakeRequestSender.class);
5764
responseSender = mock(TransportHandshaker.HandshakeResponseSender.class);
58-
node = new DiscoveryNode(nodeId, nodeId, nodeId, "host", "host_address", buildNewFakeTransportAddress(), Collections.emptyMap(),
59-
Collections.emptySet(), Version.CURRENT);
65+
remoteNode = new DiscoveryNode(nodeId, nodeId, nodeId, "host", "host_address", buildNewFakeTransportAddress(),
66+
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
6067
threadPool = new TestThreadPool("thread-poll");
61-
handshaker = new TransportHandshaker(Version.CURRENT, threadPool, requestSender, responseSender);
68+
clusterName = new ClusterName("cluster");
69+
localNode = new DiscoveryNode("local-node-id", new TransportAddress(InetAddress.getLocalHost(), 0), Version.CURRENT);
70+
handshaker = new TransportHandshaker(clusterName, Version.CURRENT, threadPool, requestSender, responseSender);
71+
handshaker.setLocalNode(localNode);
6272
}
6373

6474
@Override
@@ -70,9 +80,9 @@ public void tearDown() throws Exception {
7080
public void testHandshakeRequestAndResponse() throws IOException {
7181
PlainActionFuture<Version> versionFuture = PlainActionFuture.newFuture();
7282
long reqId = randomLongBetween(1, 10);
73-
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
83+
handshaker.sendHandshake(reqId, remoteNode, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
7484

75-
verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
85+
verify(requestSender).sendRequest(remoteNode, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
7686

7787
assertFalse(versionFuture.isDone());
7888

@@ -88,18 +98,39 @@ public void testHandshakeRequestAndResponse() throws IOException {
8898
verify(responseSender).sendResponse(eq(Version.CURRENT), eq(Collections.emptySet()), eq(mockChannel), responseCaptor.capture(),
8999
eq(reqId));
90100

101+
91102
TransportResponseHandler<TransportHandshaker.HandshakeResponse> handler = handshaker.removeHandlerForHandshake(reqId);
92103
handler.handleResponse((TransportHandshaker.HandshakeResponse) responseCaptor.getValue());
93104

94105
assertTrue(versionFuture.isDone());
95106
assertEquals(Version.CURRENT, versionFuture.actionGet());
107+
TransportHandshaker.HandshakeResponse response = (TransportHandshaker.HandshakeResponse) responseCaptor.getValue();
108+
assertEquals(Version.CURRENT, response.getVersion());
109+
assertEquals(clusterName, response.getClusterName());
110+
assertEquals(localNode, response.getDiscoveryNode());
111+
}
112+
113+
public void testHandshakeRequestAndResponsePreV7_6() throws IOException {
114+
PlainActionFuture<Version> versionFuture = PlainActionFuture.newFuture();
115+
long reqId = randomLongBetween(1, 10);
116+
handshaker.sendHandshake(reqId, remoteNode, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
117+
118+
TransportResponseHandler<TransportHandshaker.HandshakeResponse> handler = handshaker.removeHandlerForHandshake(reqId);
119+
try (BytesStreamOutput out = new BytesStreamOutput()) {
120+
new TransportHandshaker.HandshakeResponse(Version.V_7_5_0, Version.V_7_5_0, clusterName, localNode).writeTo(out);
121+
TransportHandshaker.HandshakeResponse response = handler.read(out.bytes().streamInput());
122+
assertEquals(Version.V_7_5_0, response.getVersion());
123+
// When writing or reading a 6.6 stream, these are not serialized
124+
assertNull(response.getDiscoveryNode());
125+
assertNull(response.getClusterName());
126+
}
96127
}
97128

98129
public void testHandshakeRequestFutureVersionsCompatibility() throws IOException {
99130
long reqId = randomLongBetween(1, 10);
100-
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), PlainActionFuture.newFuture());
131+
handshaker.sendHandshake(reqId, remoteNode, channel, new TimeValue(30, TimeUnit.SECONDS), PlainActionFuture.newFuture());
101132

102-
verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
133+
verify(requestSender).sendRequest(remoteNode, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
103134

104135
TcpChannel mockChannel = mock(TcpChannel.class);
105136
TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(Version.CURRENT);
@@ -131,15 +162,15 @@ public void testHandshakeRequestFutureVersionsCompatibility() throws IOException
131162

132163
TransportHandshaker.HandshakeResponse response = (TransportHandshaker.HandshakeResponse) responseCaptor.getValue();
133164

134-
assertEquals(Version.CURRENT, response.getResponseVersion());
165+
assertEquals(Version.CURRENT, response.getVersion());
135166
}
136167

137168
public void testHandshakeError() throws IOException {
138169
PlainActionFuture<Version> versionFuture = PlainActionFuture.newFuture();
139170
long reqId = randomLongBetween(1, 10);
140-
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
171+
handshaker.sendHandshake(reqId, remoteNode, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
141172

142-
verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
173+
verify(requestSender).sendRequest(remoteNode, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
143174

144175
assertFalse(versionFuture.isDone());
145176

@@ -155,9 +186,9 @@ public void testSendRequestThrowsException() throws IOException {
155186
PlainActionFuture<Version> versionFuture = PlainActionFuture.newFuture();
156187
long reqId = randomLongBetween(1, 10);
157188
Version compatibilityVersion = Version.CURRENT.minimumCompatibilityVersion();
158-
doThrow(new IOException("boom")).when(requestSender).sendRequest(node, channel, reqId, compatibilityVersion);
189+
doThrow(new IOException("boom")).when(requestSender).sendRequest(remoteNode, channel, reqId, compatibilityVersion);
159190

160-
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
191+
handshaker.sendHandshake(reqId, remoteNode, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
161192

162193
assertTrue(versionFuture.isDone());
163194
ConnectTransportException cte = expectThrows(ConnectTransportException.class, versionFuture::actionGet);
@@ -168,9 +199,9 @@ public void testSendRequestThrowsException() throws IOException {
168199
public void testHandshakeTimeout() throws IOException {
169200
PlainActionFuture<Version> versionFuture = PlainActionFuture.newFuture();
170201
long reqId = randomLongBetween(1, 10);
171-
handshaker.sendHandshake(reqId, node, channel, new TimeValue(100, TimeUnit.MILLISECONDS), versionFuture);
202+
handshaker.sendHandshake(reqId, remoteNode, channel, new TimeValue(100, TimeUnit.MILLISECONDS), versionFuture);
172203

173-
verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
204+
verify(requestSender).sendRequest(remoteNode, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
174205

175206
ConnectTransportException cte = expectThrows(ConnectTransportException.class, versionFuture::actionGet);
176207
assertThat(cte.getMessage(), containsString("handshake_timeout"));

0 commit comments

Comments
 (0)