Skip to content

Commit 2b609fd

Browse files
committed
spring-projectsGH-1419: Fix Early Exit in NodeLocator
If a node was returned by the REST call and the node was not in the map of nodes to addreses, the loop exited early. The incorrect variable was being tested (never null). Also add a more sophisticated integration test - using 2 brokers, ensure that the correct broker is located for the queue.
1 parent 8b4dd86 commit 2b609fd

File tree

9 files changed

+158
-32
lines changed

9 files changed

+158
-32
lines changed

build.gradle

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ ext {
6767
springDataVersion = '2022.0.0-SNAPSHOT'
6868
springVersion = project.hasProperty('springVersion') ? project.springVersion : '6.0.0-SNAPSHOT'
6969
springRetryVersion = '2.0.0-SNAPSHOT'
70+
testContainersVersion = '1.17.3'
7071
zstdJniVersion = '1.5.0-2'
7172
}
7273

@@ -410,6 +411,7 @@ project('spring-rabbit') {
410411
testImplementation 'io.micrometer:micrometer-tracing-bridge-brave'
411412
testImplementation 'io.micrometer:micrometer-tracing-test'
412413
testImplementation 'io.micrometer:micrometer-tracing-integration-test'
414+
testImplementation "org.testcontainers:rabbitmq:$testContainersVersion"
413415
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core'
414416
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind'
415417
testRuntimeOnly 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
@@ -474,7 +476,7 @@ project('spring-rabbit-stream') {
474476
testRuntimeOnly "org.xerial.snappy:snappy-java:$snappyVersion"
475477
testRuntimeOnly "org.lz4:lz4-java:$lz4Version"
476478
testRuntimeOnly "com.github.luben:zstd-jni:$zstdJniVersion"
477-
testImplementation "org.testcontainers:rabbitmq:1.17.3"
479+
testImplementation "org.testcontainers:rabbitmq:$testContainersVersion"
478480
testImplementation "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion"
479481
testImplementation 'org.springframework:spring-webflux'
480482
}
@@ -495,6 +497,7 @@ project('spring-rabbit-junit') {
495497
api 'org.springframework:spring-web'
496498
api 'org.junit.jupiter:junit-jupiter-api'
497499
api "org.assertj:assertj-core:$assertjVersion"
500+
optionalApi "org.testcontainers:rabbitmq:$testContainersVersion"
498501
optionalApi "ch.qos.logback:logback-classic:$logbackVersion"
499502
optionalApi 'org.apache.logging.log4j:log4j-core'
500503
compileOnly 'org.apiguardian:apiguardian-api:1.0.0'
Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,20 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.rabbit.stream.support;
17+
package org.springframework.amqp.rabbit.junit;
1818

1919
import java.time.Duration;
2020

21-
import org.testcontainers.containers.GenericContainer;
2221
import org.testcontainers.containers.RabbitMQContainer;
2322

2423
/**
2524
* @author Gary Russell
2625
* @since 2.4
2726
*
2827
*/
29-
public abstract class AbstractIntegrationTests {
28+
public abstract class AbstractTestContainerTests {
3029

31-
static final GenericContainer<?> RABBITMQ;
30+
protected static final RabbitMQContainer RABBITMQ;
3231

3332
static {
3433
if (System.getProperty("spring.rabbit.use.local.server") == null
@@ -40,7 +39,7 @@ public abstract class AbstractIntegrationTests {
4039
}
4140
RABBITMQ = new RabbitMQContainer(image)
4241
.withExposedPorts(5672, 15672, 5552)
43-
.withPluginsEnabled("rabbitmq_stream", "rabbitmq_management")
42+
.withPluginsEnabled("rabbitmq_stream")
4443
.withStartupTimeout(Duration.ofMinutes(2));
4544
RABBITMQ.start();
4645
}
@@ -50,7 +49,7 @@ public abstract class AbstractIntegrationTests {
5049
}
5150

5251
public static int amqpPort() {
53-
return RABBITMQ != null ? RABBITMQ.getMappedPort(5672) : 5672;
52+
return RABBITMQ != null ? RABBITMQ.getAmqpPort() : 5672;
5453
}
5554

5655
public static int managementPort() {
@@ -61,4 +60,8 @@ public static int streamPort() {
6160
return RABBITMQ != null ? RABBITMQ.getMappedPort(5552) : 5552;
6261
}
6362

63+
public static String restUri() {
64+
return RABBITMQ.getHttpUrl() + "/api/";
65+
}
66+
6467
}

spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/config/SuperStreamProvisioningTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
2929
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
3030
import org.springframework.amqp.rabbit.core.RabbitAdmin;
31+
import org.springframework.amqp.rabbit.junit.AbstractTestContainerTests;
3132
import org.springframework.beans.factory.annotation.Autowired;
3233
import org.springframework.context.annotation.Bean;
3334
import org.springframework.context.annotation.Configuration;
34-
import org.springframework.rabbit.stream.support.AbstractIntegrationTests;
3535
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
3636

3737
/**
@@ -40,7 +40,7 @@
4040
*
4141
*/
4242
@SpringJUnitConfig
43-
public class SuperStreamProvisioningTests extends AbstractIntegrationTests {
43+
public class SuperStreamProvisioningTests extends AbstractTestContainerTests {
4444

4545
@Test
4646
void provision(@Autowired Declarables declarables, @Autowired CachingConnectionFactory cf,

spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
4141
import org.springframework.amqp.rabbit.core.RabbitAdmin;
4242
import org.springframework.amqp.rabbit.core.RabbitTemplate;
43+
import org.springframework.amqp.rabbit.junit.AbstractTestContainerTests;
4344
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
4445
import org.springframework.beans.factory.annotation.Autowired;
4546
import org.springframework.context.SmartLifecycle;
@@ -51,7 +52,6 @@
5152
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
5253
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
5354
import org.springframework.rabbit.stream.retry.StreamRetryOperationsInterceptorFactoryBean;
54-
import org.springframework.rabbit.stream.support.AbstractIntegrationTests;
5555
import org.springframework.rabbit.stream.support.StreamMessageProperties;
5656
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
5757
import org.springframework.test.annotation.DirtiesContext;
@@ -73,7 +73,7 @@
7373
*/
7474
@SpringJUnitConfig
7575
@DirtiesContext
76-
public class RabbitListenerTests extends AbstractIntegrationTests {
76+
public class RabbitListenerTests extends AbstractTestContainerTests {
7777

7878
@Autowired
7979
Config config;

spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamConcurrentSACTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@
3232
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
3333
import org.springframework.amqp.rabbit.core.RabbitAdmin;
3434
import org.springframework.amqp.rabbit.core.RabbitTemplate;
35+
import org.springframework.amqp.rabbit.junit.AbstractTestContainerTests;
3536
import org.springframework.beans.factory.annotation.Autowired;
3637
import org.springframework.context.annotation.Bean;
3738
import org.springframework.context.annotation.Configuration;
3839
import org.springframework.rabbit.stream.config.SuperStream;
39-
import org.springframework.rabbit.stream.support.AbstractIntegrationTests;
4040
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
4141

4242
import com.rabbitmq.stream.Address;
@@ -49,7 +49,7 @@
4949
*
5050
*/
5151
@SpringJUnitConfig
52-
public class SuperStreamConcurrentSACTests extends AbstractIntegrationTests {
52+
public class SuperStreamConcurrentSACTests extends AbstractTestContainerTests {
5353

5454
@Test
5555
void concurrent(@Autowired StreamListenerContainer container, @Autowired RabbitTemplate template,

spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamSACTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@
3737
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
3838
import org.springframework.amqp.rabbit.core.RabbitAdmin;
3939
import org.springframework.amqp.rabbit.core.RabbitTemplate;
40+
import org.springframework.amqp.rabbit.junit.AbstractTestContainerTests;
4041
import org.springframework.beans.factory.annotation.Autowired;
4142
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
4243
import org.springframework.context.ApplicationContext;
4344
import org.springframework.context.annotation.Bean;
4445
import org.springframework.context.annotation.Configuration;
4546
import org.springframework.context.annotation.Scope;
4647
import org.springframework.rabbit.stream.config.SuperStream;
47-
import org.springframework.rabbit.stream.support.AbstractIntegrationTests;
4848
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
4949

5050
import com.rabbitmq.stream.Address;
@@ -57,7 +57,7 @@
5757
*
5858
*/
5959
@SpringJUnitConfig
60-
public class SuperStreamSACTests extends AbstractIntegrationTests {
60+
public class SuperStreamSACTests extends AbstractTestContainerTests {
6161

6262
@Test
6363
void superStream(@Autowired ApplicationContext context, @Autowired RabbitTemplate template,

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ default ConnectionFactory locate(String[] adminUris, Map<String, String> nodeToA
398398
String node = (String) queueInfo.get("node");
399399
if (node != null) {
400400
String nodeUri = nodeToAddress.get(node);
401-
if (uri != null) {
401+
if (nodeUri != null) {
402402
close(client);
403403
return factoryFunction.locate(queue, node, nodeUri);
404404
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryIntegrationTests.java

Lines changed: 67 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,36 +19,53 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.mockito.Mockito.mock;
2121

22+
import java.net.URI;
23+
import java.net.URISyntaxException;
24+
import java.nio.charset.StandardCharsets;
25+
import java.time.Duration;
2226
import java.util.Map;
23-
import java.util.UUID;
2427

2528
import org.junit.jupiter.api.AfterEach;
2629
import org.junit.jupiter.api.BeforeEach;
2730
import org.junit.jupiter.api.Test;
2831

29-
import org.springframework.amqp.core.Queue;
32+
import org.springframework.amqp.core.AnonymousQueue;
3033
import org.springframework.amqp.rabbit.core.RabbitAdmin;
31-
import org.springframework.amqp.rabbit.core.RabbitTemplate;
34+
import org.springframework.amqp.rabbit.junit.AbstractTestContainerTests;
3235
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
36+
import org.springframework.core.ParameterizedTypeReference;
37+
import org.springframework.http.MediaType;
38+
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
39+
import org.springframework.web.reactive.function.client.WebClient;
40+
import org.springframework.web.util.UriUtils;
3341

3442

3543
/**
3644
*
3745
* @author Gary Russell
3846
*/
3947
@RabbitAvailable(management = true, queues = "local")
40-
public class LocalizedQueueConnectionFactoryIntegrationTests {
48+
public class LocalizedQueueConnectionFactoryIntegrationTests extends AbstractTestContainerTests {
4149

4250
private LocalizedQueueConnectionFactory lqcf;
4351

4452
private CachingConnectionFactory defaultConnectionFactory;
4553

54+
private CachingConnectionFactory testContainerFactory;
55+
56+
private RabbitAdmin defaultAdmin;
57+
58+
private RabbitAdmin testContainerAdmin;
59+
4660
@BeforeEach
4761
public void setup() {
4862
this.defaultConnectionFactory = new CachingConnectionFactory("localhost");
49-
String[] addresses = new String[] { "localhost:9999", "localhost:5672" };
50-
String[] adminUris = new String[] { "http://localhost:15672", "http://localhost:15672" };
51-
String[] nodes = new String[] { "foo@bar", "rabbit@localhost" };
63+
this.defaultAdmin = new RabbitAdmin(this.defaultConnectionFactory);
64+
this.testContainerFactory = new CachingConnectionFactory("localhost", amqpPort());
65+
this.testContainerAdmin = new RabbitAdmin(this.testContainerFactory);
66+
String[] addresses = new String[] { "localhost:5672", "localhost:" + amqpPort() };
67+
String[] adminUris = new String[] { "http://localhost:15672", "http://localhost:" + managementPort() };
68+
String[] nodes = new String[] { "rabbit@localhost", findTcNode() };
5269
String vhost = "/";
5370
String username = "guest";
5471
String password = "guest";
@@ -60,18 +77,24 @@ public void setup() {
6077
public void tearDown() {
6178
this.lqcf.destroy();
6279
this.defaultConnectionFactory.destroy();
80+
this.testContainerFactory.destroy();
6381
}
6482

6583
@Test
66-
public void testConnect() throws Exception {
67-
RabbitAdmin admin = new RabbitAdmin(this.lqcf);
68-
Queue queue = new Queue(UUID.randomUUID().toString(), false, false, true);
69-
admin.declareQueue(queue);
70-
ConnectionFactory targetConnectionFactory = this.lqcf.getTargetConnectionFactory("[" + queue.getName() + "]");
71-
RabbitTemplate template = new RabbitTemplate(targetConnectionFactory);
72-
template.convertAndSend("", queue.getName(), "foo");
73-
assertThat(template.receiveAndConvert(queue.getName())).isEqualTo("foo");
74-
admin.deleteQueue(queue.getName());
84+
public void testFindCorrectConnection() throws Exception {
85+
AnonymousQueue externalQueue = new AnonymousQueue();
86+
AnonymousQueue tcQueue = new AnonymousQueue();
87+
this.defaultAdmin.declareQueue(externalQueue);
88+
this.testContainerAdmin.declareQueue(tcQueue);
89+
ConnectionFactory cf = this.lqcf
90+
.getTargetConnectionFactory("[" + externalQueue.getName() + "]");
91+
assertThat(cf).isNotSameAs(this.defaultConnectionFactory);
92+
assertThat(this.defaultAdmin.getQueueProperties(externalQueue.getName())).isNotNull();
93+
cf = this.lqcf.getTargetConnectionFactory("[" + tcQueue.getName() + "]");
94+
assertThat(cf).isNotSameAs(this.defaultConnectionFactory);
95+
assertThat(this.testContainerAdmin.getQueueProperties(tcQueue.getName())).isNotNull();
96+
this.defaultAdmin.deleteQueue(externalQueue.getName());
97+
this.testContainerAdmin.deleteQueue(tcQueue.getName());
7598
}
7699

77100
@Test
@@ -89,4 +112,32 @@ void findLocal() {
89112
lqcf.destroy();
90113
}
91114

115+
protected String findTcNode() {
116+
AnonymousQueue queue = new AnonymousQueue();
117+
this.testContainerAdmin.declareQueue(queue);
118+
URI uri;
119+
try {
120+
uri = new URI(restUri())
121+
.resolve("/api/queues/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8) + "/"
122+
+ queue.getName());
123+
}
124+
catch (URISyntaxException ex) {
125+
throw new IllegalStateException(ex);
126+
}
127+
WebClient client = WebClient.builder()
128+
.filter(ExchangeFilterFunctions.basicAuthentication(RABBITMQ.getAdminUsername(),
129+
RABBITMQ.getAdminPassword()))
130+
.build();
131+
Map<String, Object> queueInfo = client.get()
132+
.uri(uri)
133+
.accept(MediaType.APPLICATION_JSON)
134+
.retrieve()
135+
.bodyToMono(new ParameterizedTypeReference<Map<String, Object>>() {
136+
})
137+
.block(Duration.ofSeconds(10));
138+
this.testContainerAdmin.deleteQueue(queue.getName());
139+
return (String) queueInfo.get("node");
140+
}
141+
142+
92143
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.connection;
18+
19+
import static org.mockito.ArgumentMatchers.any;
20+
import static org.mockito.Mockito.spy;
21+
import static org.mockito.Mockito.times;
22+
import static org.mockito.Mockito.verify;
23+
24+
import java.net.URISyntaxException;
25+
import java.util.Map;
26+
27+
import org.junit.jupiter.api.DisplayName;
28+
import org.junit.jupiter.api.Test;
29+
30+
import org.springframework.amqp.rabbit.connection.LocalizedQueueConnectionFactory.NodeLocator;
31+
import org.springframework.lang.Nullable;
32+
33+
/**
34+
* @author Gary Russell
35+
* @since 3.0
36+
*
37+
*/
38+
public class NodeLocatorTests {
39+
40+
@Test
41+
@DisplayName("don't exit early when node to address missing")
42+
void missingNode() throws URISyntaxException {
43+
44+
NodeLocator<Object> nodeLocator = spy(new NodeLocator<Object>() {
45+
46+
@Override
47+
public Object createClient(String userName, String password) {
48+
return null;
49+
}
50+
51+
@Override
52+
@Nullable
53+
public Map<String, Object> restCall(Object client, String baseUri, String vhost, String queue) {
54+
if (baseUri.contains("foo")) {
55+
return Map.of("node", "c@d");
56+
}
57+
else {
58+
return Map.of("node", "a@b");
59+
}
60+
}
61+
});
62+
ConnectionFactory factory = nodeLocator.locate(new String[] { "http://foo", "http://bar" },
63+
Map.of("a@b", "baz"), null, "q", null, null, (q, n, u) -> {
64+
return null;
65+
});
66+
verify(nodeLocator, times(2)).restCall(any(), any(), any(), any());
67+
}
68+
69+
}

0 commit comments

Comments
 (0)