Skip to content

Commit c143c5b

Browse files
authored
GH-1419: Remove RabbitMQ http-client Usage
Resolves #1419 Use Spring WebFlux instead, while allowing the user to choose some other technology in the `LocalizedQueueConnectionFactory`. * Rename DefaultNodeLocator; add generics. * Remove unnecessary dependencies.
1 parent 639eb16 commit c143c5b

File tree

16 files changed

+498
-399
lines changed

16 files changed

+498
-399
lines changed

build.gradle

+4-12
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ ext {
4444
assertkVersion = '0.24'
4545
awaitilityVersion = '4.2.0'
4646
commonsCompressVersion = '1.20'
47-
commonsHttpClientVersion = '4.5.13'
4847
commonsPoolVersion = '2.11.1'
4948
googleJsr305Version = '3.0.2'
5049
hamcrestVersion = '2.2'
@@ -62,7 +61,6 @@ ext {
6261
mockitoVersion = '4.8.0'
6362
rabbitmqStreamVersion = '0.8.0'
6463
rabbitmqVersion = project.hasProperty('rabbitmqVersion') ? project.rabbitmqVersion : '5.16.0'
65-
rabbitmqHttpClientVersion = '3.12.1'
6664
reactorVersion = '2022.0.0-SNAPSHOT'
6765
snappyVersion = '1.1.8.4'
6866
springDataVersion = '2022.0.0-SNAPSHOT'
@@ -384,11 +382,11 @@ project('spring-rabbit') {
384382

385383
api project(':spring-amqp')
386384
api "com.rabbitmq:amqp-client:$rabbitmqVersion"
387-
optionalApi "com.rabbitmq:http-client:$rabbitmqHttpClientVersion"
388385
optionalApi 'org.springframework:spring-aop'
389386
api 'org.springframework:spring-context'
390387
api 'org.springframework:spring-messaging'
391388
api 'org.springframework:spring-tx'
389+
optionalApi 'org.springframework:spring-webflux'
392390
optionalApi 'io.projectreactor:reactor-core'
393391
optionalApi "ch.qos.logback:logback-classic:$logbackVersion"
394392
optionalApi 'org.apache.logging.log4j:log4j-core'
@@ -409,8 +407,6 @@ project('spring-rabbit') {
409407
testImplementation 'io.micrometer:micrometer-tracing-bridge-brave'
410408
testImplementation 'io.micrometer:micrometer-tracing-test'
411409
testImplementation 'io.micrometer:micrometer-tracing-integration-test'
412-
testRuntimeOnly 'org.springframework:spring-web'
413-
testRuntimeOnly "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion"
414410
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core'
415411
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind'
416412
testRuntimeOnly 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
@@ -465,14 +461,12 @@ project('spring-rabbit-stream') {
465461

466462
api project(':spring-rabbit')
467463
api "com.rabbitmq:stream-client:$rabbitmqStreamVersion"
468-
optionalApi "com.rabbitmq:http-client:$rabbitmqHttpClientVersion"
469464

470465
testApi project(':spring-rabbit-junit')
471466
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core'
472467
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind'
473468
testRuntimeOnly 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
474469
testRuntimeOnly 'com.fasterxml.jackson.module:jackson-module-kotlin'
475-
testRuntimeOnly "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion"
476470
testRuntimeOnly "org.apache.commons:commons-compress:$commonsCompressVersion"
477471
testRuntimeOnly "org.xerial.snappy:snappy-java:$snappyVersion"
478472
testRuntimeOnly "org.lz4:lz4-java:$lz4Version"
@@ -494,16 +488,14 @@ project('spring-rabbit-junit') {
494488
exclude group: 'org.hamcrest', module: 'hamcrest-core'
495489
}
496490
api "com.rabbitmq:amqp-client:$rabbitmqVersion"
497-
api ("com.rabbitmq:http-client:$rabbitmqHttpClientVersion") {
498-
exclude group: 'org.springframework', module: 'spring-web'
499-
}
500-
api 'org.springframework:spring-web'
491+
api 'org.springframework:spring-webflux'
501492
api 'org.junit.jupiter:junit-jupiter-api'
502493
api "org.assertj:assertj-core:$assertjVersion"
503494
optionalApi "ch.qos.logback:logback-classic:$logbackVersion"
504495
optionalApi 'org.apache.logging.log4j:log4j-core'
505496
compileOnly 'org.apiguardian:apiguardian-api:1.0.0'
506-
497+
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core'
498+
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind'
507499
}
508500

509501
}

spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/BrokerRunningSupport.java

+29-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,8 +17,11 @@
1717
package org.springframework.amqp.rabbit.junit;
1818

1919
import java.io.IOException;
20+
import java.net.URI;
2021
import java.net.URISyntaxException;
2122
import java.nio.ByteBuffer;
23+
import java.nio.charset.StandardCharsets;
24+
import java.time.Duration;
2225
import java.util.ArrayList;
2326
import java.util.Arrays;
2427
import java.util.HashMap;
@@ -30,13 +33,17 @@
3033
import org.apache.commons.logging.Log;
3134
import org.apache.commons.logging.LogFactory;
3235

36+
import org.springframework.core.ParameterizedTypeReference;
37+
import org.springframework.http.MediaType;
3338
import org.springframework.util.Base64Utils;
3439
import org.springframework.util.StringUtils;
40+
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
41+
import org.springframework.web.reactive.function.client.WebClient;
42+
import org.springframework.web.util.UriUtils;
3543

3644
import com.rabbitmq.client.Channel;
3745
import com.rabbitmq.client.Connection;
3846
import com.rabbitmq.client.ConnectionFactory;
39-
import com.rabbitmq.http.client.Client;
4047

4148
/**
4249
* A class that can be used to prevent integration tests from failing if the Rabbit broker application is
@@ -372,15 +379,33 @@ private Channel createQueues(Connection connection) throws IOException, URISynta
372379
}
373380
}
374381
if (this.management) {
375-
Client client = new Client(getAdminUri(), this.adminUser, this.adminPassword);
376-
if (!client.alivenessTest("/")) {
382+
if (!alivenessTest()) {
377383
throw new BrokerNotAliveException("Aliveness test failed for localhost:15672 guest/quest; "
378384
+ "management not available");
379385
}
380386
}
381387
return channel;
382388
}
383389

390+
private boolean alivenessTest() throws URISyntaxException {
391+
WebClient client = WebClient.builder()
392+
.filter(ExchangeFilterFunctions.basicAuthentication(this.adminUser, this.adminPassword))
393+
.build();
394+
URI uri = new URI(getAdminUri())
395+
.resolve("/api/aliveness-test/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8));
396+
HashMap<String, String> result = client.get()
397+
.uri(uri)
398+
.accept(MediaType.APPLICATION_JSON)
399+
.retrieve()
400+
.bodyToMono(new ParameterizedTypeReference<HashMap<String, String>>() {
401+
})
402+
.block(Duration.ofSeconds(10)); // NOSONAR magic#
403+
if (result != null) {
404+
return result.get("status").equals("ok");
405+
}
406+
return false;
407+
}
408+
384409
public static boolean fatal() {
385410
String serversRequired = System.getenv(BROKER_REQUIRED);
386411
if (Boolean.parseBoolean(serversRequired)) {

spring-rabbit-junit/src/test/java/org/springframework/amqp/rabbit/junit/RabbitAvailableTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,7 +30,7 @@
3030
* @since 2.0.2
3131
*
3232
*/
33-
@RabbitAvailable(queues = "rabbitAvailableTests.queue")
33+
@RabbitAvailable(queues = "rabbitAvailableTests.queue", management = true)
3434
public class RabbitAvailableTests {
3535

3636
@Test

spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java

+40-7
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.net.URI;
22+
import java.net.URISyntaxException;
23+
import java.nio.charset.StandardCharsets;
24+
import java.time.Duration;
2125
import java.util.ArrayList;
2226
import java.util.List;
27+
import java.util.Map;
2328
import java.util.concurrent.CountDownLatch;
2429
import java.util.concurrent.Future;
2530
import java.util.concurrent.TimeUnit;
2631
import java.util.concurrent.atomic.AtomicBoolean;
2732

28-
import org.junit.jupiter.api.Disabled;
2933
import org.junit.jupiter.api.Test;
3034

3135
import org.springframework.amqp.core.Queue;
@@ -42,6 +46,8 @@
4246
import org.springframework.context.annotation.Bean;
4347
import org.springframework.context.annotation.Configuration;
4448
import org.springframework.context.annotation.DependsOn;
49+
import org.springframework.core.ParameterizedTypeReference;
50+
import org.springframework.http.MediaType;
4551
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
4652
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
4753
import org.springframework.rabbit.stream.retry.StreamRetryOperationsInterceptorFactoryBean;
@@ -50,9 +56,10 @@
5056
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
5157
import org.springframework.test.annotation.DirtiesContext;
5258
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
59+
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
60+
import org.springframework.web.reactive.function.client.WebClient;
61+
import org.springframework.web.util.UriUtils;
5362

54-
import com.rabbitmq.http.client.Client;
55-
import com.rabbitmq.http.client.domain.QueueInfo;
5663
import com.rabbitmq.stream.Address;
5764
import com.rabbitmq.stream.Environment;
5865
import com.rabbitmq.stream.Message;
@@ -99,12 +106,38 @@ void nativeMsg(@Autowired RabbitTemplate template) throws InterruptedException {
99106
assertThat(this.config.latch4.await(10, TimeUnit.SECONDS)).isTrue();
100107
}
101108

109+
@SuppressWarnings("unchecked")
102110
@Test
103-
@Disabled("Temporary until SF uses Micrometer snaps")
104111
void queueOverAmqp() throws Exception {
105-
Client client = new Client("http://guest:guest@localhost:" + managementPort() + "/api");
106-
QueueInfo queue = client.getQueue("/", "stream.created.over.amqp");
107-
assertThat(queue.getArguments().get("x-queue-type")).isEqualTo("stream");
112+
WebClient client = WebClient.builder()
113+
.filter(ExchangeFilterFunctions.basicAuthentication("guest", "guest"))
114+
.build();
115+
Map<String, Object> queue = queueInfo("stream.created.over.amqp");
116+
assertThat(((Map<String, Object>) queue.get("arguments")).get("x-queue-type")).isEqualTo("stream");
117+
}
118+
119+
private Map<String, Object> queueInfo(String queueName) throws URISyntaxException {
120+
WebClient client = createClient("guest", "guest");
121+
URI uri = queueUri(queueName);
122+
return client.get()
123+
.uri(uri)
124+
.accept(MediaType.APPLICATION_JSON)
125+
.retrieve()
126+
.bodyToMono(new ParameterizedTypeReference<Map<String, Object>>() {
127+
})
128+
.block(Duration.ofSeconds(10));
129+
}
130+
131+
private URI queueUri(String queue) throws URISyntaxException {
132+
URI uri = new URI("http://localhost:" + managementPort() + "/api")
133+
.resolve("/api/queues/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8) + "/" + queue);
134+
return uri;
135+
}
136+
137+
private WebClient createClient(String adminUser, String adminPassword) {
138+
return WebClient.builder()
139+
.filter(ExchangeFilterFunctions.basicAuthentication(adminUser, adminPassword))
140+
.build();
108141
}
109142

110143
@Configuration(proxyBeanMethods = false)

0 commit comments

Comments
 (0)