Skip to content

Commit 5655f1f

Browse files
committed
Allow extension of CapturingTransport by subclasses (#33012)
Today, CapturingTransport#createCapturingTransportService creates a transport service with a connection manager with reasonable default behaviours, but overriding this behaviour in a consumer is a litle tricky. Additionally, the default behaviour for opening a connection duplicates the content of the CapturingTransport#openConnection() method. This change removes this duplication by delegating to openConnection() and introduces overridable nodeConnected() and onSendRequest() methods so that consumers can alter this behaviour more easily. Relates #32246 in which we test the mechanisms for opening connections to unknown (and possibly unreachable) nodes.
1 parent d7733ba commit 5655f1f

File tree

1 file changed

+37
-50
lines changed

1 file changed

+37
-50
lines changed

test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java

+37-50
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import org.elasticsearch.transport.TransportStats;
5252

5353
import java.io.IOException;
54-
import java.net.UnknownHostException;
5554
import java.util.ArrayList;
5655
import java.util.Collection;
5756
import java.util.Collections;
@@ -66,11 +65,13 @@
6665

6766
import static org.apache.lucene.util.LuceneTestCase.rarely;
6867

69-
/** A transport class that doesn't send anything but rather captures all requests for inspection from tests */
68+
/**
69+
* A transport class that doesn't send anything but rather captures all requests for inspection from tests
70+
*/
7071
public class CapturingTransport implements Transport {
7172

7273
private volatile Map<String, RequestHandlerRegistry> requestHandlers = Collections.emptyMap();
73-
final Object requestHandlerMutex = new Object();
74+
private final Object requestHandlerMutex = new Object();
7475
private final ResponseHandlers responseHandlers = new ResponseHandlers();
7576
private TransportMessageListener listener;
7677

@@ -80,7 +81,7 @@ public static class CapturedRequest {
8081
public final String action;
8182
public final TransportRequest request;
8283

83-
public CapturedRequest(DiscoveryNode node, long requestId, String action, TransportRequest request) {
84+
CapturedRequest(DiscoveryNode node, long requestId, String action, TransportRequest request) {
8485
this.node = node;
8586
this.requestId = requestId;
8687
this.action = action;
@@ -96,41 +97,15 @@ public TransportService createCapturingTransportService(Settings settings, Threa
9697
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
9798
StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this, threadPool),
9899
settings, this, threadPool);
99-
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> true);
100-
connectionManager.setDefaultConnectBehavior((cm, discoveryNode) -> new Connection() {
101-
@Override
102-
public DiscoveryNode getNode() {
103-
return discoveryNode;
104-
}
105-
106-
@Override
107-
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
108-
throws TransportException {
109-
requests.put(requestId, Tuple.tuple(discoveryNode, action));
110-
capturedRequests.add(new CapturedRequest(discoveryNode, requestId, action, request));
111-
}
112-
113-
@Override
114-
public void addCloseListener(ActionListener<Void> listener) {
115-
116-
}
117-
118-
@Override
119-
public boolean isClosed() {
120-
return false;
121-
}
122-
123-
@Override
124-
public void close() {
125-
126-
}
127-
});
100+
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> nodeConnected(discoveryNode));
101+
connectionManager.setDefaultConnectBehavior((cm, discoveryNode) -> openConnection(discoveryNode, null));
128102
return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders,
129103
connectionManager);
130-
131104
}
132105

133-
/** returns all requests captured so far. Doesn't clear the captured request list. See {@link #clear()} */
106+
/**
107+
* returns all requests captured so far. Doesn't clear the captured request list. See {@link #clear()}
108+
*/
134109
public CapturedRequest[] capturedRequests() {
135110
return capturedRequests.toArray(new CapturedRequest[0]);
136111
}
@@ -178,12 +153,16 @@ public Map<String, List<CapturedRequest>> getCapturedRequestsByTargetNodeAndClea
178153
return groupRequestsByTargetNode(requests);
179154
}
180155

181-
/** clears captured requests */
156+
/**
157+
* clears captured requests
158+
*/
182159
public void clear() {
183160
capturedRequests.clear();
184161
}
185162

186-
/** simulate a response for the given requestId */
163+
/**
164+
* simulate a response for the given requestId
165+
*/
187166
public void handleResponse(final long requestId, final TransportResponse response) {
188167
responseHandlers.onResponseReceived(requestId, listener).handleResponse(response);
189168
}
@@ -194,7 +173,7 @@ public void handleResponse(final long requestId, final TransportResponse respons
194173
*
195174
* @param requestId the id corresponding to the captured send
196175
* request
197-
* @param t the failure to wrap
176+
* @param t the failure to wrap
198177
*/
199178
public void handleLocalError(final long requestId, final Throwable t) {
200179
Tuple<DiscoveryNode, String> request = requests.get(requestId);
@@ -208,7 +187,7 @@ public void handleLocalError(final long requestId, final Throwable t) {
208187
*
209188
* @param requestId the id corresponding to the captured send
210189
* request
211-
* @param t the failure to wrap
190+
* @param t the failure to wrap
212191
*/
213192
public void handleRemoteError(final long requestId, final Throwable t) {
214193
final RemoteTransportException remoteException;
@@ -234,7 +213,7 @@ public void handleRemoteError(final long requestId, final Throwable t) {
234213
*
235214
* @param requestId the id corresponding to the captured send
236215
* request
237-
* @param e the failure
216+
* @param e the failure
238217
*/
239218
public void handleError(final long requestId, final TransportException e) {
240219
responseHandlers.onResponseReceived(requestId, listener).handleException(e);
@@ -251,13 +230,11 @@ public DiscoveryNode getNode() {
251230
@Override
252231
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
253232
throws TransportException {
254-
requests.put(requestId, Tuple.tuple(node, action));
255-
capturedRequests.add(new CapturedRequest(node, requestId, action, request));
233+
onSendRequest(requestId, action, request, node);
256234
}
257235

258236
@Override
259237
public void addCloseListener(ActionListener<Void> listener) {
260-
261238
}
262239

263240
@Override
@@ -267,11 +244,19 @@ public boolean isClosed() {
267244

268245
@Override
269246
public void close() {
270-
271247
}
272248
};
273249
}
274250

251+
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
252+
requests.put(requestId, Tuple.tuple(node, action));
253+
capturedRequests.add(new CapturedRequest(node, requestId, action, request));
254+
}
255+
256+
protected boolean nodeConnected(DiscoveryNode discoveryNode) {
257+
return true;
258+
}
259+
275260
@Override
276261
public TransportStats getStats() {
277262
throw new UnsupportedOperationException();
@@ -288,7 +273,7 @@ public Map<String, BoundTransportAddress> profileBoundAddresses() {
288273
}
289274

290275
@Override
291-
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
276+
public TransportAddress[] addressesFromString(String address, int perAddressLimit) {
292277
return new TransportAddress[0];
293278
}
294279

@@ -299,22 +284,23 @@ public Lifecycle.State lifecycleState() {
299284

300285
@Override
301286
public void addLifecycleListener(LifecycleListener listener) {
302-
303287
}
304288

305289
@Override
306290
public void removeLifecycleListener(LifecycleListener listener) {
307-
308291
}
309292

310293
@Override
311-
public void start() {}
294+
public void start() {
295+
}
312296

313297
@Override
314-
public void stop() {}
298+
public void stop() {
299+
}
315300

316301
@Override
317-
public void close() {}
302+
public void close() {
303+
}
318304

319305
@Override
320306
public List<String> getLocalAddresses() {
@@ -330,6 +316,7 @@ public <Request extends TransportRequest> void registerRequestHandler(RequestHan
330316
requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();
331317
}
332318
}
319+
333320
@Override
334321
public ResponseHandlers getResponseHandlers() {
335322
return responseHandlers;

0 commit comments

Comments
 (0)