11
11
12
12
import org .elasticsearch .Build ;
13
13
import org .elasticsearch .TransportVersion ;
14
+ import org .elasticsearch .TransportVersions ;
14
15
import org .elasticsearch .action .ActionListener ;
15
16
import org .elasticsearch .cluster .node .DiscoveryNode ;
16
17
import org .elasticsearch .common .bytes .BytesReference ;
17
18
import org .elasticsearch .common .io .stream .BytesStreamOutput ;
18
19
import org .elasticsearch .common .io .stream .StreamInput ;
19
20
import org .elasticsearch .common .io .stream .StreamOutput ;
21
+ import org .elasticsearch .common .logging .DeprecationCategory ;
22
+ import org .elasticsearch .common .logging .DeprecationLogger ;
20
23
import org .elasticsearch .common .metrics .CounterMetric ;
24
+ import org .elasticsearch .common .util .concurrent .ThreadContext ;
21
25
import org .elasticsearch .core .Strings ;
22
26
import org .elasticsearch .core .TimeValue ;
23
27
import org .elasticsearch .logging .LogManager ;
@@ -160,6 +164,7 @@ final class TransportHandshaker {
160
164
*/
161
165
162
166
private static final Logger logger = LogManager .getLogger (TransportHandshaker .class );
167
+ private static final DeprecationLogger deprecationLogger = DeprecationLogger .getLogger (logger .getName ());
163
168
164
169
static final TransportVersion V7_HANDSHAKE_VERSION = TransportVersion .fromId (6_08_00_99 );
165
170
static final TransportVersion V8_HANDSHAKE_VERSION = TransportVersion .fromId (7_17_00_99 );
@@ -171,6 +176,7 @@ final class TransportHandshaker {
171
176
);
172
177
173
178
static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake" ;
179
+ static final TransportVersion V8_18_FIRST_VERSION = TransportVersions .INDEXING_PRESSURE_THROTTLING_STATS ;
174
180
private final ConcurrentMap <Long , HandshakeResponseHandler > pendingHandshakes = new ConcurrentHashMap <>();
175
181
private final CounterMetric numHandshakes = new CounterMetric ();
176
182
@@ -246,17 +252,34 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
246
252
assert ignoreDeserializationErrors : exception ;
247
253
throw exception ;
248
254
}
249
- ensureCompatibleVersion (version , handshakeRequest .transportVersion , handshakeRequest .releaseVersion , channel );
255
+ ensureCompatibleVersion (
256
+ version ,
257
+ handshakeRequest .transportVersion ,
258
+ handshakeRequest .releaseVersion ,
259
+ channel ,
260
+ threadPool .getThreadContext ()
261
+ );
250
262
channel .sendResponse (new HandshakeResponse (this .version , Build .current ().version ()));
251
263
}
252
264
253
265
static void ensureCompatibleVersion (
254
266
TransportVersion localTransportVersion ,
255
267
TransportVersion remoteTransportVersion ,
256
- String releaseVersion ,
257
- Object channel
268
+ String remoteReleaseVersion ,
269
+ Object channel ,
270
+ ThreadContext threadContext
258
271
) {
259
272
if (TransportVersion .isCompatible (remoteTransportVersion )) {
273
+ // Prevent log message headers from being added to the handshake response.
274
+ try (var ignored = threadContext .stashContext ()) {
275
+ if (remoteTransportVersion .before (V8_18_FIRST_VERSION )) {
276
+ deprecationLogger .warn (
277
+ DeprecationCategory .OTHER ,
278
+ "handshake_version" ,
279
+ getDeprecationMessage (localTransportVersion , remoteTransportVersion , remoteReleaseVersion , channel )
280
+ );
281
+ }
282
+ }
260
283
if (remoteTransportVersion .onOrAfter (localTransportVersion )) {
261
284
// Remote is newer than us, so we will be using our transport protocol and it's up to the other end to decide whether it
262
285
// knows how to do that.
@@ -273,7 +296,7 @@ static void ensureCompatibleVersion(
273
296
"""
274
297
Rejecting unreadable transport handshake from remote node with version [%s/%s] received on [%s] since this node has \
275
298
version [%s/%s] which has an incompatible wire format.""" ,
276
- releaseVersion ,
299
+ remoteReleaseVersion ,
277
300
remoteTransportVersion ,
278
301
channel ,
279
302
Build .current ().version (),
@@ -284,6 +307,24 @@ static void ensureCompatibleVersion(
284
307
285
308
}
286
309
310
+ // Non-private for testing
311
+ static String getDeprecationMessage (
312
+ TransportVersion localTransportVersion ,
313
+ TransportVersion remoteTransportVersion ,
314
+ String remoteReleaseVersion ,
315
+ Object channel
316
+ ) {
317
+ return Strings .format (
318
+ "Performed a handshake with a remote node with version [%s/%s] received on [%s] which "
319
+ + "will be incompatible after this node on version [%s/%s] is upgraded to 9.x." ,
320
+ remoteReleaseVersion ,
321
+ remoteTransportVersion ,
322
+ channel ,
323
+ Build .current ().version (),
324
+ localTransportVersion
325
+ );
326
+ }
327
+
287
328
TransportResponseHandler <HandshakeResponse > removeHandlerForHandshake (long requestId ) {
288
329
return pendingHandshakes .remove (requestId );
289
330
}
@@ -323,7 +364,13 @@ public Executor executor() {
323
364
public void handleResponse (HandshakeResponse response ) {
324
365
if (isDone .compareAndSet (false , true )) {
325
366
ActionListener .completeWith (listener , () -> {
326
- ensureCompatibleVersion (version , response .getTransportVersion (), response .getReleaseVersion (), channel );
367
+ ensureCompatibleVersion (
368
+ version ,
369
+ response .getTransportVersion (),
370
+ response .getReleaseVersion (),
371
+ channel ,
372
+ threadPool .getThreadContext ()
373
+ );
327
374
final var resultVersion = TransportVersion .min (TransportHandshaker .this .version , response .getTransportVersion ());
328
375
assert TransportVersion .current ().before (version ) // simulating a newer-version transport service for test purposes
329
376
|| resultVersion .isKnown () : "negotiated unknown version " + resultVersion ;
0 commit comments