19
19
20
20
package org .elasticsearch .discovery ;
21
21
22
+ import org .apache .logging .log4j .Level ;
23
+ import org .apache .logging .log4j .LogManager ;
24
+ import org .apache .logging .log4j .Logger ;
22
25
import org .apache .lucene .util .SetOnce ;
26
+ import org .elasticsearch .ElasticsearchException ;
23
27
import org .elasticsearch .Version ;
24
28
import org .elasticsearch .action .ActionListener ;
25
29
import org .elasticsearch .cluster .ClusterName ;
26
30
import org .elasticsearch .cluster .node .DiscoveryNode ;
31
+ import org .elasticsearch .common .Nullable ;
32
+ import org .elasticsearch .common .logging .Loggers ;
27
33
import org .elasticsearch .common .settings .Settings ;
34
+ import org .elasticsearch .common .transport .TransportAddress ;
28
35
import org .elasticsearch .test .ESTestCase ;
36
+ import org .elasticsearch .test .MockLogAppender ;
37
+ import org .elasticsearch .test .junit .annotations .TestLogging ;
29
38
import org .elasticsearch .test .transport .MockTransport ;
30
39
import org .elasticsearch .threadpool .TestThreadPool ;
31
40
import org .elasticsearch .threadpool .ThreadPool ;
41
+ import org .elasticsearch .transport .ConnectTransportException ;
42
+ import org .elasticsearch .transport .TransportException ;
32
43
import org .elasticsearch .transport .TransportRequest ;
33
44
import org .elasticsearch .transport .TransportService ;
34
45
import org .elasticsearch .transport .TransportService .HandshakeResponse ;
44
55
import static org .elasticsearch .discovery .HandshakingTransportAddressConnector .PROBE_HANDSHAKE_TIMEOUT_SETTING ;
45
56
import static org .elasticsearch .node .Node .NODE_NAME_SETTING ;
46
57
import static org .hamcrest .Matchers .equalTo ;
58
+ import static org .hamcrest .Matchers .notNullValue ;
59
+ import static org .hamcrest .Matchers .oneOf ;
47
60
48
61
public class HandshakingTransportAddressConnectorTests extends ESTestCase {
49
62
50
63
private DiscoveryNode remoteNode ;
64
+ private TransportAddress discoveryAddress ;
51
65
private TransportService transportService ;
52
66
private ThreadPool threadPool ;
53
67
private String remoteClusterName ;
54
68
private HandshakingTransportAddressConnector handshakingTransportAddressConnector ;
55
69
private DiscoveryNode localNode ;
56
70
57
71
private boolean dropHandshake ;
72
+ @ Nullable // unless we want the full connection to fail
73
+ private TransportException fullConnectionFailure ;
58
74
59
75
@ Before
60
76
public void startServices () {
@@ -66,17 +82,24 @@ public void startServices() {
66
82
threadPool = new TestThreadPool ("node" , settings );
67
83
68
84
remoteNode = null ;
85
+ discoveryAddress = null ;
69
86
remoteClusterName = null ;
70
87
dropHandshake = false ;
88
+ fullConnectionFailure = null ;
71
89
72
90
final MockTransport mockTransport = new MockTransport () {
73
91
@ Override
74
92
protected void onSendRequest (long requestId , String action , TransportRequest request , DiscoveryNode node ) {
75
93
super .onSendRequest (requestId , action , request , node );
76
94
assertThat (action , equalTo (TransportService .HANDSHAKE_ACTION_NAME ));
77
- assertEquals (remoteNode .getAddress (), node .getAddress ());
95
+ assertThat (discoveryAddress , notNullValue ());
96
+ assertThat (node .getAddress (), oneOf (discoveryAddress , remoteNode .getAddress ()));
78
97
if (dropHandshake == false ) {
79
- handleResponse (requestId , new HandshakeResponse (remoteNode , new ClusterName (remoteClusterName ), Version .CURRENT ));
98
+ if (fullConnectionFailure != null && node .getAddress ().equals (remoteNode .getAddress ())) {
99
+ handleError (requestId , fullConnectionFailure );
100
+ } else {
101
+ handleResponse (requestId , new HandshakeResponse (remoteNode , new ClusterName (remoteClusterName ), Version .CURRENT ));
102
+ }
80
103
}
81
104
}
82
105
};
@@ -91,7 +114,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
91
114
}
92
115
93
116
@ After
94
- public void stopServices () throws InterruptedException {
117
+ public void stopServices () {
95
118
transportService .stop ();
96
119
terminate (threadPool );
97
120
}
@@ -102,8 +125,9 @@ public void testConnectsToMasterNode() throws InterruptedException {
102
125
103
126
remoteNode = new DiscoveryNode ("remote-node" , buildNewFakeTransportAddress (), Version .CURRENT );
104
127
remoteClusterName = "local-cluster" ;
128
+ discoveryAddress = getDiscoveryAddress ();
105
129
106
- handshakingTransportAddressConnector .connectToRemoteMasterNode (remoteNode . getAddress () , new ActionListener <DiscoveryNode >() {
130
+ handshakingTransportAddressConnector .connectToRemoteMasterNode (discoveryAddress , new ActionListener <DiscoveryNode >() {
107
131
@ Override
108
132
public void onResponse (DiscoveryNode discoveryNode ) {
109
133
receivedNode .set (discoveryNode );
@@ -120,44 +144,84 @@ public void onFailure(Exception e) {
120
144
assertEquals (remoteNode , receivedNode .get ());
121
145
}
122
146
147
+ @ TestLogging (reason ="ensure logging happens" , value ="org.elasticsearch.discovery.HandshakingTransportAddressConnector:INFO" )
148
+ public void testLogsFullConnectionFailureAfterSuccessfulHandshake () throws Exception {
149
+
150
+ remoteNode = new DiscoveryNode ("remote-node" , buildNewFakeTransportAddress (), Version .CURRENT );
151
+ remoteClusterName = "local-cluster" ;
152
+ discoveryAddress = buildNewFakeTransportAddress ();
153
+
154
+ fullConnectionFailure = new ConnectTransportException (remoteNode , "simulated" , new ElasticsearchException ("root cause" ));
155
+
156
+ FailureListener failureListener = new FailureListener ();
157
+
158
+ MockLogAppender mockAppender = new MockLogAppender ();
159
+ mockAppender .start ();
160
+ mockAppender .addExpectation (
161
+ new MockLogAppender .SeenEventExpectation (
162
+ "message" ,
163
+ HandshakingTransportAddressConnector .class .getCanonicalName (),
164
+ Level .WARN ,
165
+ "*completed handshake with [*] but followup connection failed*" ));
166
+ Logger targetLogger = LogManager .getLogger (HandshakingTransportAddressConnector .class );
167
+ Loggers .addAppender (targetLogger , mockAppender );
168
+
169
+ try {
170
+ handshakingTransportAddressConnector .connectToRemoteMasterNode (discoveryAddress , failureListener );
171
+ failureListener .assertFailure ();
172
+ mockAppender .assertAllExpectationsMatched ();
173
+ } finally {
174
+ Loggers .removeAppender (targetLogger , mockAppender );
175
+ mockAppender .stop ();
176
+ }
177
+ }
178
+
123
179
public void testDoesNotConnectToNonMasterNode () throws InterruptedException {
124
180
remoteNode = new DiscoveryNode ("remote-node" , buildNewFakeTransportAddress (), emptyMap (), emptySet (), Version .CURRENT );
181
+ discoveryAddress = getDiscoveryAddress ();
125
182
remoteClusterName = "local-cluster" ;
126
183
127
184
FailureListener failureListener = new FailureListener ();
128
- handshakingTransportAddressConnector .connectToRemoteMasterNode (remoteNode . getAddress () , failureListener );
185
+ handshakingTransportAddressConnector .connectToRemoteMasterNode (discoveryAddress , failureListener );
129
186
failureListener .assertFailure ();
130
187
}
131
188
132
189
public void testDoesNotConnectToLocalNode () throws Exception {
133
190
remoteNode = localNode ;
191
+ discoveryAddress = getDiscoveryAddress ();
134
192
remoteClusterName = "local-cluster" ;
135
193
136
194
FailureListener failureListener = new FailureListener ();
137
- handshakingTransportAddressConnector .connectToRemoteMasterNode (remoteNode . getAddress () , failureListener );
195
+ handshakingTransportAddressConnector .connectToRemoteMasterNode (discoveryAddress , failureListener );
138
196
failureListener .assertFailure ();
139
197
}
140
198
141
199
public void testDoesNotConnectToDifferentCluster () throws InterruptedException {
142
200
remoteNode = new DiscoveryNode ("remote-node" , buildNewFakeTransportAddress (), Version .CURRENT );
201
+ discoveryAddress = getDiscoveryAddress ();
143
202
remoteClusterName = "another-cluster" ;
144
203
145
204
FailureListener failureListener = new FailureListener ();
146
- handshakingTransportAddressConnector .connectToRemoteMasterNode (remoteNode . getAddress () , failureListener );
205
+ handshakingTransportAddressConnector .connectToRemoteMasterNode (discoveryAddress , failureListener );
147
206
failureListener .assertFailure ();
148
207
}
149
208
150
209
public void testHandshakeTimesOut () throws InterruptedException {
151
210
remoteNode = new DiscoveryNode ("remote-node" , buildNewFakeTransportAddress (), Version .CURRENT );
211
+ discoveryAddress = getDiscoveryAddress ();
152
212
remoteClusterName = "local-cluster" ;
153
213
dropHandshake = true ;
154
214
155
215
FailureListener failureListener = new FailureListener ();
156
- handshakingTransportAddressConnector .connectToRemoteMasterNode (remoteNode . getAddress () , failureListener );
216
+ handshakingTransportAddressConnector .connectToRemoteMasterNode (discoveryAddress , failureListener );
157
217
Thread .sleep (PROBE_HANDSHAKE_TIMEOUT_SETTING .get (Settings .EMPTY ).millis ());
158
218
failureListener .assertFailure ();
159
219
}
160
220
221
+ private TransportAddress getDiscoveryAddress () {
222
+ return randomBoolean () ? remoteNode .getAddress () : buildNewFakeTransportAddress ();
223
+ }
224
+
161
225
private class FailureListener implements ActionListener <DiscoveryNode > {
162
226
final CountDownLatch completionLatch = new CountDownLatch (1 );
163
227
0 commit comments