Skip to content

Commit 5b801c8

Browse files
committed
Rename DefaultNodeLocator; add generics.
1 parent 859f926 commit 5b801c8

File tree

4 files changed

+47
-22
lines changed

4 files changed

+47
-22
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java

+30-11
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public class LocalizedQueueConnectionFactory implements ConnectionFactory, Routi
8383

8484
private final String trustStorePassPhrase;
8585

86-
private NodeLocator nodeLocator = new DefaultNodeLocator();
86+
private NodeLocator<?> nodeLocator = new WebFluxNodeLocator();
8787

8888
/**
8989
* @param defaultConnectionFactory the fallback connection factory to use if the queue
@@ -206,7 +206,7 @@ private static Map<String, String> nodesAddressesToMap(String[] nodes, String[]
206206
* @param nodeLocator the locator.
207207
* @since 2.4.8
208208
*/
209-
public void setNodeLocator(NodeLocator nodeLocator) {
209+
public void setNodeLocator(NodeLocator<?> nodeLocator) {
210210
this.nodeLocator = nodeLocator;
211211
}
212212

@@ -254,7 +254,8 @@ public void clearConnectionListeners() {
254254
public ConnectionFactory getTargetConnectionFactory(Object key) {
255255
String queue = ((String) key);
256256
queue = queue.substring(1, queue.length() - 1);
257-
Assert.isTrue(!queue.contains(","), () -> "Cannot use LocalizedQueueConnectionFactory with more than one queue: " + key);
257+
Assert.isTrue(!queue.contains(","),
258+
() -> "Cannot use LocalizedQueueConnectionFactory with more than one queue: " + key);
258259
ConnectionFactory connectionFactory = determineConnectionFactory(queue);
259260
if (connectionFactory == null) {
260261
return this.defaultConnectionFactory;
@@ -345,10 +346,11 @@ public void destroy() {
345346

346347
/**
347348
* Used to obtain a connection factory for the queue leader.
349+
*
350+
* @param <T> the client type.
348351
* @since 2.4.8
349352
*/
350-
@FunctionalInterface
351-
public interface NodeLocator {
353+
public interface NodeLocator<T> {
352354

353355
LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(NodeLocator.class));
354356

@@ -364,8 +366,10 @@ public interface NodeLocator {
364366
* @return a connection factory, if the leader node was found; null otherwise.
365367
*/
366368
@Nullable
367-
default ConnectionFactory locate(String[] adminUris, Map<String, String> nodeToAddress, String vhost, String username,
368-
String password, String queue, FactoryFinder factoryFunction) {
369+
default ConnectionFactory locate(String[] adminUris, Map<String, String> nodeToAddress, String vhost,
370+
String username, String password, String queue, FactoryFinder factoryFunction) {
371+
372+
T client = createClient(username, password);
369373

370374
for (int i = 0; i < adminUris.length; i++) {
371375
String adminUri = adminUris[i];
@@ -376,12 +380,13 @@ default ConnectionFactory locate(String[] adminUris, Map<String, String> nodeToA
376380
URI uri = new URI(adminUri)
377381
.resolve("/api/queues/" + UriUtils.encodePathSegment(vhost, StandardCharsets.UTF_8) + "/"
378382
+ queue);
379-
HashMap<String, Object> queueInfo = restCall(username, password, uri);
383+
HashMap<String, Object> queueInfo = restCall(client, uri);
380384
if (queueInfo != null) {
381385
String node = (String) queueInfo.get("node");
382386
if (node != null) {
383387
String nodeUri = nodeToAddress.get(node);
384388
if (uri != null) {
389+
close(client);
385390
return factoryFunction.locate(queue, node, nodeUri);
386391
}
387392
if (LOGGER.isDebugEnabled()) {
@@ -399,17 +404,31 @@ default ConnectionFactory locate(String[] adminUris, Map<String, String> nodeToA
399404
}
400405
}
401406
LOGGER.warn("Failed to determine queue location for: " + queue + ", using default connection factory");
407+
close(client);
402408
return null;
403409
}
404410

405411
/**
406-
* Retrieve a map of queue properties using the RabbitMQ Management REST API.
407-
* @param username the user name.
412+
* Create a client for subsequent use.
413+
* @param userName the user name.
408414
* @param password the password.
415+
* @return the client.
416+
*/
417+
T createClient(String userName, String password);
418+
419+
/**
420+
* Close the client.
421+
* @param client the client.
422+
*/
423+
default void close(T client) {
424+
}
425+
426+
/**
427+
* Retrieve a map of queue properties using the RabbitMQ Management REST API.
409428
* @param uri the uri.
410429
* @return the map of queue properties.
411430
*/
412-
HashMap<String, Object> restCall(String username, String password, URI uri);
431+
HashMap<String, Object> restCall(T client, URI uri);
413432

414433
}
415434

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/DefaultNodeLocator.java renamed to spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/WebFluxNodeLocator.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,16 @@
2727
import org.springframework.web.reactive.function.client.WebClient;
2828

2929
/**
30-
* Default {@link NodeLocator} using the Spring WebFlux {@link WebClient}.
30+
* A {@link NodeLocator} using the Spring WebFlux {@link WebClient}.
3131
*
3232
* @author Gary Russell
3333
* @since 2.4.8
3434
*
3535
*/
36-
public class DefaultNodeLocator implements NodeLocator {
36+
public class WebFluxNodeLocator implements NodeLocator<WebClient> {
3737

3838
@Override
39-
public HashMap<String, Object> restCall(String username, String password, URI uri) {
40-
WebClient client = createClient(username, password);
39+
public HashMap<String, Object> restCall(WebClient client, URI uri) {
4140
HashMap<String, Object> queueInfo = client.get()
4241
.uri(uri)
4342
.accept(MediaType.APPLICATION_JSON)
@@ -54,7 +53,8 @@ public HashMap<String, Object> restCall(String username, String password, URI ur
5453
* @param password the password.
5554
* @return The client.
5655
*/
57-
protected WebClient createClient(String username, String password) {
56+
@Override
57+
public WebClient createClient(String username, String password) {
5858
return WebClient.builder()
5959
.filter(ExchangeFilterFunctions.basicAuthentication(username, password))
6060
.build();

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,10 @@ protected ConnectionFactory createConnectionFactory(String address, String node)
100100
}
101101

102102
};
103-
lqcf.setNodeLocator(new DefaultNodeLocator() {
103+
lqcf.setNodeLocator(new WebFluxNodeLocator() {
104104

105105
@Override
106-
protected WebClient createClient(String username, String password) {
106+
public WebClient createClient(String username, String password) {
107107
return firstServer.get() ? client1 : client2;
108108
}
109109

src/reference/asciidoc/amqp.adoc

+10-4
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,7 @@ Notice that the first three parameters are arrays of `addresses`, `adminUris`, a
796796
These are positional in that, when a container attempts to connect to a queue, it uses the admin API to determine which node is the lead for the queue and connects to the address in the same array position as that node.
797797

798798
IMPORTANT: Starting with version 3.0, the RabbitMQ `http-client` is no longer used to access the Rest API.
799-
Instead, by default, the `WebClient` from Spring Webflux is used; it is not added to the class path by default.
799+
Instead, by default, the `WebClient` from Spring Webflux is used; which is not added to the class path by default.
800800

801801
.Maven
802802
====
@@ -816,17 +816,23 @@ compile 'org.springframework.amqp:spring-rabbit'
816816
----
817817
====
818818

819-
You can also use other REST technology by extending `DefaultNodeLocator` and overriding its `restCall` method.
819+
You can also use other REST technology by implementing `LocalizedQueueConnectionFactory.NodeLocator` and overriding its `createClient, ``restCall`, and optionally, `close` methods.
820820

821821
====
822822
[source, java]
823823
----
824-
lqcf.setNodeLocator(new DefaultNodeLocator() {
824+
lqcf.setNodeLocator(new DefaultNodeLocator<MyClient>() {
825825
826826
@Override
827-
public HashMap<String, Object> restCall(String username, String password, URI uri) {
827+
public MyClient createClient(String userName, String password) {
828+
...
829+
}
830+
831+
@Override
832+
public HashMap<String, Object> restCall(MyClient client, URI uri) {
828833
...
829834
});
835+
830836
});
831837
----
832838
====

0 commit comments

Comments
 (0)