Skip to content

Commit 2bc0841

Browse files
committed
spring-projectsGH-1419: Remove RabbitMQ http-client Usage
Resolves spring-projects#1419 Use Spring WebFlux instead, while allowing the user to choose some other technology in the `LocalizedQueueConnectionFactory`..
1 parent 639eb16 commit 2bc0841

File tree

16 files changed

+468
-396
lines changed

16 files changed

+468
-396
lines changed

build.gradle

+8-10
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ ext {
4444
assertkVersion = '0.24'
4545
awaitilityVersion = '4.2.0'
4646
commonsCompressVersion = '1.20'
47-
commonsHttpClientVersion = '4.5.13'
47+
commonsHttpClientVersion = '5.1.3'
4848
commonsPoolVersion = '2.11.1'
4949
googleJsr305Version = '3.0.2'
5050
hamcrestVersion = '2.2'
@@ -62,7 +62,6 @@ ext {
6262
mockitoVersion = '4.8.0'
6363
rabbitmqStreamVersion = '0.8.0'
6464
rabbitmqVersion = project.hasProperty('rabbitmqVersion') ? project.rabbitmqVersion : '5.16.0'
65-
rabbitmqHttpClientVersion = '3.12.1'
6665
reactorVersion = '2022.0.0-SNAPSHOT'
6766
snappyVersion = '1.1.8.4'
6867
springDataVersion = '2022.0.0-SNAPSHOT'
@@ -384,11 +383,12 @@ project('spring-rabbit') {
384383

385384
api project(':spring-amqp')
386385
api "com.rabbitmq:amqp-client:$rabbitmqVersion"
387-
optionalApi "com.rabbitmq:http-client:$rabbitmqHttpClientVersion"
388386
optionalApi 'org.springframework:spring-aop'
389387
api 'org.springframework:spring-context'
390388
api 'org.springframework:spring-messaging'
391389
api 'org.springframework:spring-tx'
390+
optionalApi 'org.springframework:spring-web'
391+
optionalApi 'org.springframework:spring-webflux'
392392
optionalApi 'io.projectreactor:reactor-core'
393393
optionalApi "ch.qos.logback:logback-classic:$logbackVersion"
394394
optionalApi 'org.apache.logging.log4j:log4j-core'
@@ -410,7 +410,7 @@ project('spring-rabbit') {
410410
testImplementation 'io.micrometer:micrometer-tracing-test'
411411
testImplementation 'io.micrometer:micrometer-tracing-integration-test'
412412
testRuntimeOnly 'org.springframework:spring-web'
413-
testRuntimeOnly "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion"
413+
testRuntimeOnly "org.apache.httpcomponents.client5:httpclient5:$commonsHttpClientVersion"
414414
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core'
415415
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind'
416416
testRuntimeOnly 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
@@ -465,14 +465,13 @@ project('spring-rabbit-stream') {
465465

466466
api project(':spring-rabbit')
467467
api "com.rabbitmq:stream-client:$rabbitmqStreamVersion"
468-
optionalApi "com.rabbitmq:http-client:$rabbitmqHttpClientVersion"
469468

470469
testApi project(':spring-rabbit-junit')
471470
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core'
472471
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind'
473472
testRuntimeOnly 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
474473
testRuntimeOnly 'com.fasterxml.jackson.module:jackson-module-kotlin'
475-
testRuntimeOnly "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion"
474+
testRuntimeOnly "org.apache.httpcomponents.client5:httpclient5:$commonsHttpClientVersion"
476475
testRuntimeOnly "org.apache.commons:commons-compress:$commonsCompressVersion"
477476
testRuntimeOnly "org.xerial.snappy:snappy-java:$snappyVersion"
478477
testRuntimeOnly "org.lz4:lz4-java:$lz4Version"
@@ -494,16 +493,15 @@ project('spring-rabbit-junit') {
494493
exclude group: 'org.hamcrest', module: 'hamcrest-core'
495494
}
496495
api "com.rabbitmq:amqp-client:$rabbitmqVersion"
497-
api ("com.rabbitmq:http-client:$rabbitmqHttpClientVersion") {
498-
exclude group: 'org.springframework', module: 'spring-web'
499-
}
500496
api 'org.springframework:spring-web'
497+
api 'org.springframework:spring-webflux'
501498
api 'org.junit.jupiter:junit-jupiter-api'
502499
api "org.assertj:assertj-core:$assertjVersion"
503500
optionalApi "ch.qos.logback:logback-classic:$logbackVersion"
504501
optionalApi 'org.apache.logging.log4j:log4j-core'
505502
compileOnly 'org.apiguardian:apiguardian-api:1.0.0'
506-
503+
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core'
504+
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind'
507505
}
508506

509507
}

spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/BrokerRunningSupport.java

+29-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,8 +17,11 @@
1717
package org.springframework.amqp.rabbit.junit;
1818

1919
import java.io.IOException;
20+
import java.net.URI;
2021
import java.net.URISyntaxException;
2122
import java.nio.ByteBuffer;
23+
import java.nio.charset.StandardCharsets;
24+
import java.time.Duration;
2225
import java.util.ArrayList;
2326
import java.util.Arrays;
2427
import java.util.HashMap;
@@ -30,13 +33,17 @@
3033
import org.apache.commons.logging.Log;
3134
import org.apache.commons.logging.LogFactory;
3235

36+
import org.springframework.core.ParameterizedTypeReference;
37+
import org.springframework.http.MediaType;
3338
import org.springframework.util.Base64Utils;
3439
import org.springframework.util.StringUtils;
40+
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
41+
import org.springframework.web.reactive.function.client.WebClient;
42+
import org.springframework.web.util.UriUtils;
3543

3644
import com.rabbitmq.client.Channel;
3745
import com.rabbitmq.client.Connection;
3846
import com.rabbitmq.client.ConnectionFactory;
39-
import com.rabbitmq.http.client.Client;
4047

4148
/**
4249
* A class that can be used to prevent integration tests from failing if the Rabbit broker application is
@@ -372,15 +379,33 @@ private Channel createQueues(Connection connection) throws IOException, URISynta
372379
}
373380
}
374381
if (this.management) {
375-
Client client = new Client(getAdminUri(), this.adminUser, this.adminPassword);
376-
if (!client.alivenessTest("/")) {
382+
if (!alivenessTest()) {
377383
throw new BrokerNotAliveException("Aliveness test failed for localhost:15672 guest/quest; "
378384
+ "management not available");
379385
}
380386
}
381387
return channel;
382388
}
383389

390+
private boolean alivenessTest() throws URISyntaxException {
391+
WebClient client = WebClient.builder()
392+
.filter(ExchangeFilterFunctions.basicAuthentication(this.adminUser, this.adminPassword))
393+
.build();
394+
URI uri = new URI(getAdminUri())
395+
.resolve("/api/aliveness-test/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8));
396+
HashMap<String, String> result = client.get()
397+
.uri(uri)
398+
.accept(MediaType.APPLICATION_JSON)
399+
.retrieve()
400+
.bodyToMono(new ParameterizedTypeReference<HashMap<String, String>>() {
401+
})
402+
.block(Duration.ofSeconds(10)); // NOSONAR magic#
403+
if (result != null) {
404+
return result.get("status").equals("ok");
405+
}
406+
return false;
407+
}
408+
384409
public static boolean fatal() {
385410
String serversRequired = System.getenv(BROKER_REQUIRED);
386411
if (Boolean.parseBoolean(serversRequired)) {

spring-rabbit-junit/src/test/java/org/springframework/amqp/rabbit/junit/RabbitAvailableTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,7 +30,7 @@
3030
* @since 2.0.2
3131
*
3232
*/
33-
@RabbitAvailable(queues = "rabbitAvailableTests.queue")
33+
@RabbitAvailable(queues = "rabbitAvailableTests.queue", management = true)
3434
public class RabbitAvailableTests {
3535

3636
@Test

spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java

+40-7
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.net.URI;
22+
import java.net.URISyntaxException;
23+
import java.nio.charset.StandardCharsets;
24+
import java.time.Duration;
2125
import java.util.ArrayList;
2226
import java.util.List;
27+
import java.util.Map;
2328
import java.util.concurrent.CountDownLatch;
2429
import java.util.concurrent.Future;
2530
import java.util.concurrent.TimeUnit;
2631
import java.util.concurrent.atomic.AtomicBoolean;
2732

28-
import org.junit.jupiter.api.Disabled;
2933
import org.junit.jupiter.api.Test;
3034

3135
import org.springframework.amqp.core.Queue;
@@ -42,6 +46,8 @@
4246
import org.springframework.context.annotation.Bean;
4347
import org.springframework.context.annotation.Configuration;
4448
import org.springframework.context.annotation.DependsOn;
49+
import org.springframework.core.ParameterizedTypeReference;
50+
import org.springframework.http.MediaType;
4551
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
4652
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
4753
import org.springframework.rabbit.stream.retry.StreamRetryOperationsInterceptorFactoryBean;
@@ -50,9 +56,10 @@
5056
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
5157
import org.springframework.test.annotation.DirtiesContext;
5258
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
59+
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
60+
import org.springframework.web.reactive.function.client.WebClient;
61+
import org.springframework.web.util.UriUtils;
5362

54-
import com.rabbitmq.http.client.Client;
55-
import com.rabbitmq.http.client.domain.QueueInfo;
5663
import com.rabbitmq.stream.Address;
5764
import com.rabbitmq.stream.Environment;
5865
import com.rabbitmq.stream.Message;
@@ -99,12 +106,38 @@ void nativeMsg(@Autowired RabbitTemplate template) throws InterruptedException {
99106
assertThat(this.config.latch4.await(10, TimeUnit.SECONDS)).isTrue();
100107
}
101108

109+
@SuppressWarnings("unchecked")
102110
@Test
103-
@Disabled("Temporary until SF uses Micrometer snaps")
104111
void queueOverAmqp() throws Exception {
105-
Client client = new Client("http://guest:guest@localhost:" + managementPort() + "/api");
106-
QueueInfo queue = client.getQueue("/", "stream.created.over.amqp");
107-
assertThat(queue.getArguments().get("x-queue-type")).isEqualTo("stream");
112+
WebClient client = WebClient.builder()
113+
.filter(ExchangeFilterFunctions.basicAuthentication("guest", "guest"))
114+
.build();
115+
Map<String, Object> queue = queueInfo("stream.created.over.amqp");
116+
assertThat(((Map<String, Object>) queue.get("arguments")).get("x-queue-type")).isEqualTo("stream");
117+
}
118+
119+
private Map<String, Object> queueInfo(String queueName) throws URISyntaxException {
120+
WebClient client = createClient("guest", "guest");
121+
URI uri = queueUri(queueName);
122+
return client.get()
123+
.uri(uri)
124+
.accept(MediaType.APPLICATION_JSON)
125+
.retrieve()
126+
.bodyToMono(new ParameterizedTypeReference<Map<String, Object>>() {
127+
})
128+
.block(Duration.ofSeconds(10));
129+
}
130+
131+
private URI queueUri(String queue) throws URISyntaxException {
132+
URI uri = new URI("http://localhost:" + managementPort() + "/api")
133+
.resolve("/api/queues/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8) + "/" + queue);
134+
return uri;
135+
}
136+
137+
private WebClient createClient(String adminUser, String adminPassword) {
138+
return WebClient.builder()
139+
.filter(ExchangeFilterFunctions.basicAuthentication(adminUser, adminPassword))
140+
.build();
108141
}
109142

110143
@Configuration(proxyBeanMethods = false)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.connection;
18+
19+
import java.net.URI;
20+
import java.time.Duration;
21+
import java.util.HashMap;
22+
23+
import org.springframework.amqp.rabbit.connection.LocalizedQueueConnectionFactory.NodeLocator;
24+
import org.springframework.core.ParameterizedTypeReference;
25+
import org.springframework.http.MediaType;
26+
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
27+
import org.springframework.web.reactive.function.client.WebClient;
28+
29+
/**
30+
* Default {@link NodeLocator} using the Spring WebFlux {@link WebClient}.
31+
*
32+
* @author Gary Russell
33+
* @since 2.4.8
34+
*
35+
*/
36+
public class DefaultNodeLocator implements NodeLocator {
37+
38+
@Override
39+
public HashMap<String, Object> restCall(String username, String password, URI uri) {
40+
WebClient client = createClient(username, password);
41+
HashMap<String, Object> queueInfo = client.get()
42+
.uri(uri)
43+
.accept(MediaType.APPLICATION_JSON)
44+
.retrieve()
45+
.bodyToMono(new ParameterizedTypeReference<HashMap<String, Object>>() {
46+
})
47+
.block(Duration.ofSeconds(10)); // NOSONAR magic#
48+
return queueInfo;
49+
}
50+
51+
/**
52+
* Create a client instance.
53+
* @param username the username
54+
* @param password the password.
55+
* @return The client.
56+
*/
57+
protected WebClient createClient(String username, String password) {
58+
return WebClient.builder()
59+
.filter(ExchangeFilterFunctions.basicAuthentication(username, password))
60+
.build();
61+
}
62+
63+
}

0 commit comments

Comments
 (0)