Skip to content

Commit caf08ca

Browse files
committed
Rename DefaultNodeLocator; add generics.
1 parent 859f926 commit caf08ca

File tree

5 files changed

+77
-44
lines changed

5 files changed

+77
-44
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java

+38-17
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package org.springframework.amqp.rabbit.connection;
1818

1919
import java.net.URI;
20-
import java.nio.charset.StandardCharsets;
20+
import java.net.URISyntaxException;
2121
import java.util.AbstractMap.SimpleImmutableEntry;
2222
import java.util.Arrays;
2323
import java.util.HashMap;
@@ -35,7 +35,6 @@
3535
import org.springframework.core.log.LogAccessor;
3636
import org.springframework.lang.Nullable;
3737
import org.springframework.util.Assert;
38-
import org.springframework.web.util.UriUtils;
3938

4039
/**
4140
* A {@link RoutingConnectionFactory} that determines the node on which a queue is located and
@@ -83,7 +82,7 @@ public class LocalizedQueueConnectionFactory implements ConnectionFactory, Routi
8382

8483
private final String trustStorePassPhrase;
8584

86-
private NodeLocator nodeLocator = new DefaultNodeLocator();
85+
private NodeLocator<?> nodeLocator = new WebFluxNodeLocator();
8786

8887
/**
8988
* @param defaultConnectionFactory the fallback connection factory to use if the queue
@@ -206,7 +205,7 @@ private static Map<String, String> nodesAddressesToMap(String[] nodes, String[]
206205
* @param nodeLocator the locator.
207206
* @since 2.4.8
208207
*/
209-
public void setNodeLocator(NodeLocator nodeLocator) {
208+
public void setNodeLocator(NodeLocator<?> nodeLocator) {
210209
this.nodeLocator = nodeLocator;
211210
}
212211

@@ -254,7 +253,8 @@ public void clearConnectionListeners() {
254253
public ConnectionFactory getTargetConnectionFactory(Object key) {
255254
String queue = ((String) key);
256255
queue = queue.substring(1, queue.length() - 1);
257-
Assert.isTrue(!queue.contains(","), () -> "Cannot use LocalizedQueueConnectionFactory with more than one queue: " + key);
256+
Assert.isTrue(!queue.contains(","),
257+
() -> "Cannot use LocalizedQueueConnectionFactory with more than one queue: " + key);
258258
ConnectionFactory connectionFactory = determineConnectionFactory(queue);
259259
if (connectionFactory == null) {
260260
return this.defaultConnectionFactory;
@@ -345,10 +345,11 @@ public void destroy() {
345345

346346
/**
347347
* Used to obtain a connection factory for the queue leader.
348+
*
349+
* @param <T> the client type.
348350
* @since 2.4.8
349351
*/
350-
@FunctionalInterface
351-
public interface NodeLocator {
352+
public interface NodeLocator<T> {
352353

353354
LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(NodeLocator.class));
354355

@@ -364,24 +365,26 @@ public interface NodeLocator {
364365
* @return a connection factory, if the leader node was found; null otherwise.
365366
*/
366367
@Nullable
367-
default ConnectionFactory locate(String[] adminUris, Map<String, String> nodeToAddress, String vhost, String username,
368-
String password, String queue, FactoryFinder factoryFunction) {
368+
default ConnectionFactory locate(String[] adminUris, Map<String, String> nodeToAddress, String vhost,
369+
String username, String password, String queue, FactoryFinder factoryFunction) {
370+
371+
T client = createClient(username, password);
369372

370373
for (int i = 0; i < adminUris.length; i++) {
371374
String adminUri = adminUris[i];
372375
if (!adminUri.endsWith("/api/")) {
373376
adminUri += "/api/";
374377
}
375378
try {
376-
URI uri = new URI(adminUri)
377-
.resolve("/api/queues/" + UriUtils.encodePathSegment(vhost, StandardCharsets.UTF_8) + "/"
378-
+ queue);
379-
HashMap<String, Object> queueInfo = restCall(username, password, uri);
379+
String uri = new URI(adminUri)
380+
.resolve("/api/queues/").toString();
381+
HashMap<String, Object> queueInfo = restCall(client, uri, vhost, queue);
380382
if (queueInfo != null) {
381383
String node = (String) queueInfo.get("node");
382384
if (node != null) {
383385
String nodeUri = nodeToAddress.get(node);
384386
if (uri != null) {
387+
close(client);
385388
return factoryFunction.locate(queue, node, nodeUri);
386389
}
387390
if (LOGGER.isDebugEnabled()) {
@@ -399,17 +402,35 @@ default ConnectionFactory locate(String[] adminUris, Map<String, String> nodeToA
399402
}
400403
}
401404
LOGGER.warn("Failed to determine queue location for: " + queue + ", using default connection factory");
405+
close(client);
402406
return null;
403407
}
404408

405409
/**
406-
* Retrieve a map of queue properties using the RabbitMQ Management REST API.
407-
* @param username the user name.
410+
* Create a client for subsequent use.
411+
* @param userName the user name.
408412
* @param password the password.
409-
* @param uri the uri.
413+
* @return the client.
414+
*/
415+
T createClient(String userName, String password);
416+
417+
/**
418+
* Close the client.
419+
* @param client the client.
420+
*/
421+
default void close(T client) {
422+
}
423+
424+
/**
425+
* Retrieve a map of queue properties using the RabbitMQ Management REST API.
426+
* @param baseUri the base uri.
427+
* @param vhost the virtual host.
428+
* @param queue the queue name.
410429
* @return the map of queue properties.
430+
* @throws URISyntaxException if the syntax is bad.
411431
*/
412-
HashMap<String, Object> restCall(String username, String password, URI uri);
432+
HashMap<String, Object> restCall(T client, String baseUri, String vhost, String queue)
433+
throws URISyntaxException;
413434

414435
}
415436

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/DefaultNodeLocator.java renamed to spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/WebFluxNodeLocator.java

+12-5
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.springframework.amqp.rabbit.connection;
1818

1919
import java.net.URI;
20+
import java.net.URISyntaxException;
21+
import java.nio.charset.StandardCharsets;
2022
import java.time.Duration;
2123
import java.util.HashMap;
2224

@@ -25,19 +27,23 @@
2527
import org.springframework.http.MediaType;
2628
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
2729
import org.springframework.web.reactive.function.client.WebClient;
30+
import org.springframework.web.util.UriUtils;
2831

2932
/**
30-
* Default {@link NodeLocator} using the Spring WebFlux {@link WebClient}.
33+
* A {@link NodeLocator} using the Spring WebFlux {@link WebClient}.
3134
*
3235
* @author Gary Russell
3336
* @since 2.4.8
3437
*
3538
*/
36-
public class DefaultNodeLocator implements NodeLocator {
39+
public class WebFluxNodeLocator implements NodeLocator<WebClient> {
3740

3841
@Override
39-
public HashMap<String, Object> restCall(String username, String password, URI uri) {
40-
WebClient client = createClient(username, password);
42+
public HashMap<String, Object> restCall(WebClient client, String baseUri, String vhost, String queue)
43+
throws URISyntaxException {
44+
45+
URI uri = new URI(baseUri)
46+
.resolve("/api/queues/" + UriUtils.encodePathSegment(vhost, StandardCharsets.UTF_8) + "/" + queue);
4147
HashMap<String, Object> queueInfo = client.get()
4248
.uri(uri)
4349
.accept(MediaType.APPLICATION_JSON)
@@ -54,7 +60,8 @@ public HashMap<String, Object> restCall(String username, String password, URI ur
5460
* @param password the password.
5561
* @return The client.
5662
*/
57-
protected WebClient createClient(String username, String password) {
63+
@Override
64+
public WebClient createClient(String username, String password) {
5865
return WebClient.builder()
5966
.filter(ExchangeFilterFunctions.basicAuthentication(username, password))
6067
.build();

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryIntegrationTests.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
package org.springframework.amqp.rabbit.connection;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.Mockito.mock;
2021

22+
import java.util.Map;
2123
import java.util.UUID;
2224

2325
import org.junit.jupiter.api.AfterEach;
@@ -34,7 +36,7 @@
3436
*
3537
* @author Gary Russell
3638
*/
37-
@RabbitAvailable(management = true)
39+
@RabbitAvailable(management = true, queues = "local")
3840
public class LocalizedQueueConnectionFactoryIntegrationTests {
3941

4042
private LocalizedQueueConnectionFactory lqcf;
@@ -72,4 +74,16 @@ public void testConnect() throws Exception {
7274
admin.deleteQueue(queue.getName());
7375
}
7476

77+
@Test
78+
void findLocal() {
79+
ConnectionFactory defaultCf = mock(ConnectionFactory.class);
80+
LocalizedQueueConnectionFactory lqcf = new LocalizedQueueConnectionFactory(defaultCf,
81+
Map.of("rabbit@localhost", "localhost:5672"), new String[] { "http://localhost:15672" },
82+
"/", "guest", "guest", false, null);
83+
ConnectionFactory cf = lqcf.getTargetConnectionFactory("[local]");
84+
RabbitAdmin admin = new RabbitAdmin(cf);
85+
assertThat(admin.getQueueProperties("local")).isNotNull();
86+
lqcf.destroy();
87+
}
88+
7589
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryTests.java

+2-17
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@
4242
import org.mockito.ArgumentCaptor;
4343
import org.mockito.internal.stubbing.answers.CallsRealMethods;
4444

45-
import org.springframework.amqp.rabbit.core.RabbitAdmin;
46-
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
4745
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
4846
import org.springframework.amqp.utils.test.TestUtils;
4947
import org.springframework.beans.DirectFieldAccessor;
@@ -63,7 +61,6 @@
6361
* @author Gary Russell
6462
* @author Artem Bilan
6563
*/
66-
@RabbitAvailable(queues = "local")
6764
public class LocalizedQueueConnectionFactoryTests {
6865

6966
private final Map<String, Channel> channels = new HashMap<String, Channel>();
@@ -100,10 +97,10 @@ protected ConnectionFactory createConnectionFactory(String address, String node)
10097
}
10198

10299
};
103-
lqcf.setNodeLocator(new DefaultNodeLocator() {
100+
lqcf.setNodeLocator(new WebFluxNodeLocator() {
104101

105102
@Override
106-
protected WebClient createClient(String username, String password) {
103+
public WebClient createClient(String username, String password) {
107104
return firstServer.get() ? client1 : client2;
108105
}
109106

@@ -171,18 +168,6 @@ private WebClient doCreateClient(String uri, String username, String password, S
171168
.build();
172169
}
173170

174-
@Test
175-
void findLocal() {
176-
ConnectionFactory defaultCf = mock(ConnectionFactory.class);
177-
LocalizedQueueConnectionFactory lqcf = new LocalizedQueueConnectionFactory(defaultCf,
178-
Map.of("rabbit@localhost", "localhost:5672"), new String[] { "http://localhost:15672" },
179-
"/", "guest", "guest", false, null);
180-
ConnectionFactory cf = lqcf.getTargetConnectionFactory("[local]");
181-
RabbitAdmin admin = new RabbitAdmin(cf);
182-
assertThat(admin.getQueueProperties("local")).isNotNull();
183-
lqcf.destroy();
184-
}
185-
186171
@Test
187172
public void test2Queues() throws Exception {
188173
try {

src/reference/asciidoc/amqp.adoc

+10-4
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,7 @@ Notice that the first three parameters are arrays of `addresses`, `adminUris`, a
796796
These are positional in that, when a container attempts to connect to a queue, it uses the admin API to determine which node is the lead for the queue and connects to the address in the same array position as that node.
797797

798798
IMPORTANT: Starting with version 3.0, the RabbitMQ `http-client` is no longer used to access the Rest API.
799-
Instead, by default, the `WebClient` from Spring Webflux is used; it is not added to the class path by default.
799+
Instead, by default, the `WebClient` from Spring Webflux is used; which is not added to the class path by default.
800800

801801
.Maven
802802
====
@@ -816,17 +816,23 @@ compile 'org.springframework.amqp:spring-rabbit'
816816
----
817817
====
818818

819-
You can also use other REST technology by extending `DefaultNodeLocator` and overriding its `restCall` method.
819+
You can also use other REST technology by implementing `LocalizedQueueConnectionFactory.NodeLocator` and overriding its `createClient, ``restCall`, and optionally, `close` methods.
820820

821821
====
822822
[source, java]
823823
----
824-
lqcf.setNodeLocator(new DefaultNodeLocator() {
824+
lqcf.setNodeLocator(new DefaultNodeLocator<MyClient>() {
825825
826826
@Override
827-
public HashMap<String, Object> restCall(String username, String password, URI uri) {
827+
public MyClient createClient(String userName, String password) {
828+
...
829+
}
830+
831+
@Override
832+
public HashMap<String, Object> restCall(MyClient client, URI uri) {
828833
...
829834
});
835+
830836
});
831837
----
832838
====

0 commit comments

Comments
 (0)