22
22
import org .apache .logging .log4j .LogManager ;
23
23
import org .apache .logging .log4j .Logger ;
24
24
import org .apache .logging .log4j .message .ParameterizedMessage ;
25
+ import org .elasticsearch .Build ;
25
26
import org .elasticsearch .Version ;
26
27
import org .elasticsearch .action .ActionListener ;
27
28
import org .elasticsearch .action .ActionListenerResponseHandler ;
37
38
import org .elasticsearch .common .io .stream .StreamOutput ;
38
39
import org .elasticsearch .common .io .stream .Writeable ;
39
40
import org .elasticsearch .common .lease .Releasable ;
41
+ import org .elasticsearch .common .logging .DeprecationLogger ;
40
42
import org .elasticsearch .common .logging .Loggers ;
41
43
import org .elasticsearch .common .regex .Regex ;
42
44
import org .elasticsearch .common .settings .ClusterSettings ;
73
75
import java .util .function .Predicate ;
74
76
import java .util .function .Supplier ;
75
77
76
- public class TransportService extends AbstractLifecycleComponent implements ReportingService <TransportInfo >, TransportMessageListener ,
77
- TransportConnectionListener {
78
+ public class TransportService extends AbstractLifecycleComponent
79
+ implements ReportingService <TransportInfo >, TransportMessageListener , TransportConnectionListener {
80
+
78
81
private static final Logger logger = LogManager .getLogger (TransportService .class );
79
82
83
+ private static final String PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY = "es.unsafely_permit_handshake_from_incompatible_builds" ;
84
+ private static final boolean PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS ;
85
+
86
+ static {
87
+ final String value = System .getProperty (PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY );
88
+ if (value == null ) {
89
+ PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS = false ;
90
+ } else if (Boolean .parseBoolean (value )) {
91
+ PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS = true ;
92
+ } else {
93
+ throw new IllegalArgumentException ("invalid value [" + value + "] for system property ["
94
+ + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "]" );
95
+ }
96
+ }
97
+
98
+
80
99
public static final String DIRECT_RESPONSE_PROFILE = ".direct" ;
81
100
public static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake" ;
82
101
@@ -115,6 +134,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) {
115
134
private final RemoteClusterService remoteClusterService ;
116
135
117
136
private final boolean validateConnections ;
137
+ private final boolean requireCompatibleBuild ;
118
138
119
139
/** if set will call requests sent to this id to shortcut and executed locally */
120
140
volatile DiscoveryNode localNode = null ;
@@ -160,9 +180,15 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa
160
180
public TransportService (Settings settings , Transport transport , ThreadPool threadPool , TransportInterceptor transportInterceptor ,
161
181
Function <BoundTransportAddress , DiscoveryNode > localNodeFactory , @ Nullable ClusterSettings clusterSettings ,
162
182
Set <String > taskHeaders , ConnectionManager connectionManager ) {
183
+
184
+ final boolean isTransportClient = TransportClient .CLIENT_TYPE .equals (settings .get (Client .CLIENT_TYPE_SETTING_S .getKey ()));
185
+
186
+ // If we are a transport client then we skip the check that the remote node has a compatible build hash
187
+ this .requireCompatibleBuild = isTransportClient == false ;
188
+
163
189
// The only time we do not want to validate node connections is when this is a transport client using the simple node sampler
164
- this .validateConnections = TransportClient . CLIENT_TYPE . equals ( settings . get ( Client . CLIENT_TYPE_SETTING_S . getKey ())) == false ||
165
- TransportClient . CLIENT_TRANSPORT_SNIFF . get ( settings );
190
+ this .validateConnections = isTransportClient == false || TransportClient . CLIENT_TRANSPORT_SNIFF . get ( settings );
191
+
166
192
this .transport = transport ;
167
193
transport .setSlowLogThreshold (TransportSettings .SLOW_OPERATION_THRESHOLD_SETTING .get (settings ));
168
194
this .threadPool = threadPool ;
@@ -192,7 +218,14 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa
192
218
false , false ,
193
219
HandshakeRequest ::new ,
194
220
(request , channel , task ) -> channel .sendResponse (
195
- new HandshakeResponse (localNode , clusterName , localNode .getVersion ())));
221
+ new HandshakeResponse (localNode .getVersion (), Build .CURRENT .hash (), localNode , clusterName )));
222
+
223
+ if (PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS ) {
224
+ logger .warn ("transport handshakes from incompatible builds are unsafely permitted on this node; remove system property [" +
225
+ PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "] to resolve this warning" );
226
+ DeprecationLogger .getLogger (TransportService .class ).deprecate ("permit_handshake_from_incompatible_builds" ,
227
+ "system property [" + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "] is deprecated and should be removed" );
228
+ }
196
229
}
197
230
198
231
public RemoteClusterService getRemoteClusterService () {
@@ -481,7 +514,7 @@ public void onFailure(Exception e) {
481
514
listener .onFailure (e );
482
515
}
483
516
}
484
- , HandshakeResponse :: new , ThreadPool .Names .GENERIC
517
+ , in -> new HandshakeResponse ( in , requireCompatibleBuild ) , ThreadPool .Names .GENERIC
485
518
));
486
519
}
487
520
@@ -503,28 +536,89 @@ private HandshakeRequest() {
503
536
}
504
537
505
538
public static class HandshakeResponse extends TransportResponse {
539
+
540
+ private static final Version BUILD_HASH_HANDSHAKE_VERSION = Version .V_7_11_0 ;
541
+
542
+ private final Version version ;
543
+
544
+ @ Nullable // if version < BUILD_HASH_HANDSHAKE_VERSION
545
+ private final String buildHash ;
546
+
506
547
private final DiscoveryNode discoveryNode ;
548
+
507
549
private final ClusterName clusterName ;
508
- private final Version version ;
509
550
510
- public HandshakeResponse (DiscoveryNode discoveryNode , ClusterName clusterName , Version version ) {
511
- this .discoveryNode = discoveryNode ;
512
- this .version = version ;
513
- this .clusterName = clusterName ;
551
+ public HandshakeResponse (Version version , String buildHash , DiscoveryNode discoveryNode , ClusterName clusterName ) {
552
+ this .buildHash = Objects .requireNonNull (buildHash );
553
+ this .discoveryNode = Objects .requireNonNull (discoveryNode );
554
+ this .version = Objects .requireNonNull (version );
555
+ this .clusterName = Objects .requireNonNull (clusterName );
514
556
}
515
557
516
- public HandshakeResponse (StreamInput in ) throws IOException {
558
+ public HandshakeResponse (StreamInput in , boolean requireCompatibleBuild ) throws IOException {
517
559
super (in );
518
- discoveryNode = in .readOptionalWriteable (DiscoveryNode ::new );
519
- clusterName = new ClusterName (in );
520
- version = Version .readVersion (in );
560
+ if (in .getVersion ().onOrAfter (BUILD_HASH_HANDSHAKE_VERSION )) {
561
+ // the first two fields need only VInts and raw (ASCII) characters, so we cross our fingers and hope that they appear
562
+ // on the wire as we expect them to even if this turns out to be an incompatible build
563
+ version = Version .readVersion (in );
564
+ buildHash = in .readString ();
565
+
566
+ try {
567
+ // If the remote node is incompatible then make an effort to identify it anyway, so we can mention it in the exception
568
+ // message, but recognise that this may fail
569
+ discoveryNode = new DiscoveryNode (in );
570
+ } catch (Exception e ) {
571
+ if (isIncompatibleBuild (version , buildHash , requireCompatibleBuild )) {
572
+ throw new IllegalArgumentException ("unidentifiable remote node is build [" + buildHash +
573
+ "] of version [" + version + "] but this node is build [" + Build .CURRENT .hash () +
574
+ "] of version [" + Version .CURRENT + "] which has an incompatible wire format" , e );
575
+ } else {
576
+ throw e ;
577
+ }
578
+ }
579
+
580
+ if (isIncompatibleBuild (version , buildHash , requireCompatibleBuild )) {
581
+ if (PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS ) {
582
+ logger .warn ("remote node [{}] is build [{}] of version [{}] but this node is build [{}] of version [{}] " +
583
+ "which may not be compatible; remove system property [{}] to resolve this warning" ,
584
+ discoveryNode , buildHash , version , Build .CURRENT .hash (), Version .CURRENT ,
585
+ PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY );
586
+ } else {
587
+ throw new IllegalArgumentException ("remote node [" + discoveryNode + "] is build [" + buildHash +
588
+ "] of version [" + version + "] but this node is build [" + Build .CURRENT .hash () +
589
+ "] of version [" + Version .CURRENT + "] which has an incompatible wire format" );
590
+ }
591
+ }
592
+
593
+ clusterName = new ClusterName (in );
594
+ } else {
595
+ discoveryNode = in .readOptionalWriteable (DiscoveryNode ::new );
596
+ clusterName = new ClusterName (in );
597
+ version = Version .readVersion (in );
598
+ buildHash = null ;
599
+ }
521
600
}
522
601
523
602
@ Override
524
603
public void writeTo (StreamOutput out ) throws IOException {
525
- out .writeOptionalWriteable (discoveryNode );
526
- clusterName .writeTo (out );
527
- Version .writeVersion (version , out );
604
+ if (out .getVersion ().onOrAfter (BUILD_HASH_HANDSHAKE_VERSION )) {
605
+ Version .writeVersion (version , out );
606
+ out .writeString (buildHash );
607
+ discoveryNode .writeTo (out );
608
+ clusterName .writeTo (out );
609
+ } else {
610
+ out .writeOptionalWriteable (discoveryNode );
611
+ clusterName .writeTo (out );
612
+ Version .writeVersion (version , out );
613
+ }
614
+ }
615
+
616
+ public Version getVersion () {
617
+ return version ;
618
+ }
619
+
620
+ public String getBuildHash () {
621
+ return buildHash ;
528
622
}
529
623
530
624
public DiscoveryNode getDiscoveryNode () {
@@ -534,6 +628,10 @@ public DiscoveryNode getDiscoveryNode() {
534
628
public ClusterName getClusterName () {
535
629
return clusterName ;
536
630
}
631
+
632
+ private static boolean isIncompatibleBuild (Version version , String buildHash , boolean requireCompatibleBuild ) {
633
+ return requireCompatibleBuild && version == Version .CURRENT && Build .CURRENT .hash ().equals (buildHash ) == false ;
634
+ }
537
635
}
538
636
539
637
public void disconnectFromNode (DiscoveryNode node ) {
@@ -1353,4 +1451,5 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder)
1353
1451
}
1354
1452
}
1355
1453
}
1454
+
1356
1455
}
0 commit comments