Skip to content

Commit 6e3e246

Browse files
garyrussellartembilan
authored andcommitted
GH-1419: Remove RabbitMQ http-client Usage
Resolves #1419 Use Spring WebFlux instead, while allowing the user to choose some other technology in the `LocalizedQueueConnectionFactory`. * Rename DefaultNodeLocator; add generics. * Remove unnecessary dependencies. GH-1419: Add RestTemplateNodeLocator - also remove hard dependency on `spring-webflux` from `spring-rabbit-junit`. Fix Javadoc. Use RestTemplate for aliveness test; JVM HttpClient not available in Java 8. Restore spring-rabbit-junit jackson dependency.
1 parent 7ea8fc7 commit 6e3e246

File tree

20 files changed

+680
-381
lines changed

20 files changed

+680
-381
lines changed

build.gradle

+11-7
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ ext {
6060
rabbitmqStreamVersion = '0.4.0'
6161
rabbitmqVersion = project.hasProperty('rabbitmqVersion') ? project.rabbitmqVersion : '5.13.1'
6262
rabbitmqHttpClientVersion = '3.12.1'
63-
reactorVersion = '2020.0.20'
63+
reactorVersion = '2020.0.24'
6464
snappyVersion = '1.1.8.4'
6565
springDataCommonsVersion = '2.6.7'
6666
springVersion = project.hasProperty('springVersion') ? project.springVersion : '5.3.23'
@@ -366,7 +366,10 @@ project('spring-rabbit') {
366366
api 'org.springframework:spring-context'
367367
api 'org.springframework:spring-messaging'
368368
api 'org.springframework:spring-tx'
369+
optionalApi 'org.springframework:spring-webflux'
370+
optionalApi "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion"
369371
optionalApi 'io.projectreactor:reactor-core'
372+
optionalApi 'io.projectreactor.netty:reactor-netty-http'
370373
optionalApi "ch.qos.logback:logback-classic:$logbackVersion"
371374
optionalApi 'org.apache.logging.log4j:log4j-core'
372375
optionalApi "io.micrometer:micrometer-core:$micrometerVersion"
@@ -380,10 +383,10 @@ project('spring-rabbit') {
380383
testApi project(':spring-rabbit-junit')
381384
testImplementation("com.willowtreeapps.assertk:assertk-jvm:$assertkVersion")
382385
testImplementation "org.hibernate.validator:hibernate-validator:$hibernateValidationVersion"
383-
testRuntimeOnly 'org.springframework:spring-web'
386+
testRuntimeOnly 'org.springframework:spring-webflux'
384387
testRuntimeOnly "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion"
385388
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core'
386-
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind'
389+
testImplementation 'com.fasterxml.jackson.core:jackson-databind'
387390
testRuntimeOnly 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
388391
testRuntimeOnly 'com.fasterxml.jackson.module:jackson-module-kotlin'
389392
testRuntimeOnly ("junit:junit:$junit4Version") {
@@ -407,7 +410,6 @@ project('spring-rabbit-stream') {
407410

408411
api project(':spring-rabbit')
409412
api "com.rabbitmq:stream-client:$rabbitmqStreamVersion"
410-
optionalApi "com.rabbitmq:http-client:$rabbitmqHttpClientVersion"
411413

412414
testApi project(':spring-rabbit-junit')
413415
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core'
@@ -419,8 +421,11 @@ project('spring-rabbit-stream') {
419421
testRuntimeOnly "org.xerial.snappy:snappy-java:$snappyVersion"
420422
testRuntimeOnly "org.lz4:lz4-java:$lz4Version"
421423
testRuntimeOnly "com.github.luben:zstd-jni:$zstdJniVersion"
424+
testRuntimeOnly 'io.projectreactor.netty:reactor-netty-http'
422425
testImplementation "org.testcontainers:rabbitmq:1.15.3"
423426
testImplementation "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion"
427+
testImplementation 'org.springframework:spring-webflux'
428+
testImplementation 'io.projectreactor.netty:reactor-netty-http'
424429
}
425430

426431
}
@@ -436,12 +441,11 @@ project('spring-rabbit-junit') {
436441
exclude group: 'org.hamcrest', module: 'hamcrest-core'
437442
}
438443
api "com.rabbitmq:amqp-client:$rabbitmqVersion"
439-
api ("com.rabbitmq:http-client:$rabbitmqHttpClientVersion") {
440-
exclude group: 'org.springframework', module: 'spring-web'
441-
}
442444
api 'org.springframework:spring-web'
443445
api 'org.junit.jupiter:junit-jupiter-api'
444446
api "org.assertj:assertj-core:$assertjVersion"
447+
api "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion"
448+
api 'com.fasterxml.jackson.core:jackson-databind'
445449
optionalApi "ch.qos.logback:logback-classic:$logbackVersion"
446450
optionalApi 'org.apache.logging.log4j:log4j-core'
447451
compileOnly 'org.apiguardian:apiguardian-api:1.0.0'

spring-amqp/src/main/java/org/springframework/amqp/support/converter/SerializerMessageConverter.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,7 +27,6 @@
2727
import org.springframework.amqp.core.MessageProperties;
2828
import org.springframework.beans.DirectFieldAccessor;
2929
import org.springframework.core.ConfigurableObjectInputStream;
30-
import org.springframework.core.NestedIOException;
3130
import org.springframework.core.serializer.DefaultDeserializer;
3231
import org.springframework.core.serializer.DefaultSerializer;
3332
import org.springframework.core.serializer.Deserializer;
@@ -182,7 +181,7 @@ protected Class<?> resolveClass(ObjectStreamClass classDesc)
182181
return objectInputStream.readObject();
183182
}
184183
catch (ClassNotFoundException ex) {
185-
throw new NestedIOException("Failed to deserialize object type", ex);
184+
throw new IOException("Failed to deserialize object type", ex);
186185
}
187186
}
188187

spring-amqp/src/main/java/org/springframework/amqp/utils/SerializationUtils.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2019 the original author or authors.
2+
* Copyright 2006-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,7 +26,6 @@
2626
import java.util.Set;
2727

2828
import org.springframework.core.ConfigurableObjectInputStream;
29-
import org.springframework.core.NestedIOException;
3029
import org.springframework.util.ObjectUtils;
3130
import org.springframework.util.PatternMatchUtils;
3231

@@ -126,7 +125,7 @@ protected Class<?> resolveClass(ObjectStreamClass classDesc)
126125
return objectInputStream.readObject();
127126
}
128127
catch (ClassNotFoundException ex) {
129-
throw new NestedIOException("Failed to deserialize object type", ex);
128+
throw new IOException("Failed to deserialize object type", ex);
130129
}
131130
}
132131

spring-amqp/src/test/java/org/springframework/amqp/support/converter/SerializerMessageConverterTests.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
2323

2424
import java.io.ByteArrayInputStream;
2525
import java.io.ByteArrayOutputStream;
26+
import java.io.IOException;
2627
import java.io.InputStream;
2728
import java.io.ObjectInputStream;
2829
import java.io.ObjectOutputStream;
@@ -33,7 +34,6 @@
3334
import org.springframework.amqp.core.Message;
3435
import org.springframework.amqp.core.MessageProperties;
3536
import org.springframework.amqp.utils.test.TestUtils;
36-
import org.springframework.core.NestedIOException;
3737
import org.springframework.core.serializer.DefaultDeserializer;
3838
import org.springframework.core.serializer.Deserializer;
3939

@@ -178,7 +178,7 @@ public void messageConversionExceptionForClassNotFound() throws Exception {
178178
body[10] = 'z';
179179
assertThatThrownBy(() -> converter.fromMessage(message))
180180
.isExactlyInstanceOf(MessageConversionException.class)
181-
.hasCauseExactlyInstanceOf(NestedIOException.class);
181+
.hasCauseExactlyInstanceOf(IOException.class);
182182
}
183183

184184
}

spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/BrokerRunningSupport.java

+45-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,8 +17,10 @@
1717
package org.springframework.amqp.rabbit.junit;
1818

1919
import java.io.IOException;
20+
import java.net.URI;
2021
import java.net.URISyntaxException;
2122
import java.nio.ByteBuffer;
23+
import java.nio.charset.StandardCharsets;
2224
import java.util.ArrayList;
2325
import java.util.Arrays;
2426
import java.util.HashMap;
@@ -29,14 +31,28 @@
2931

3032
import org.apache.commons.logging.Log;
3133
import org.apache.commons.logging.LogFactory;
32-
34+
import org.apache.http.HttpHost;
35+
import org.apache.http.client.AuthCache;
36+
import org.apache.http.client.protocol.HttpClientContext;
37+
import org.apache.http.impl.auth.BasicScheme;
38+
import org.apache.http.impl.client.BasicAuthCache;
39+
import org.apache.http.protocol.BasicHttpContext;
40+
import org.apache.http.protocol.HttpContext;
41+
42+
import org.springframework.http.HttpMethod;
43+
import org.springframework.http.HttpStatus;
44+
import org.springframework.http.ResponseEntity;
45+
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
46+
import org.springframework.http.client.support.BasicAuthenticationInterceptor;
47+
import org.springframework.lang.Nullable;
3348
import org.springframework.util.Base64Utils;
3449
import org.springframework.util.StringUtils;
50+
import org.springframework.web.client.RestTemplate;
51+
import org.springframework.web.util.UriUtils;
3552

3653
import com.rabbitmq.client.Channel;
3754
import com.rabbitmq.client.Connection;
3855
import com.rabbitmq.client.ConnectionFactory;
39-
import com.rabbitmq.http.client.Client;
4056

4157
/**
4258
* A class that can be used to prevent integration tests from failing if the Rabbit broker application is
@@ -372,15 +388,39 @@ private Channel createQueues(Connection connection) throws IOException, URISynta
372388
}
373389
}
374390
if (this.management) {
375-
Client client = new Client(getAdminUri(), this.adminUser, this.adminPassword);
376-
if (!client.alivenessTest("/")) {
391+
if (!alivenessTest()) {
377392
throw new BrokerNotAliveException("Aliveness test failed for localhost:15672 guest/quest; "
378393
+ "management not available");
379394
}
380395
}
381396
return channel;
382397
}
383398

399+
private boolean alivenessTest() throws URISyntaxException {
400+
URI uri = new URI(getAdminUri())
401+
.resolve("/api/aliveness-test/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8));
402+
HttpHost host = new HttpHost(uri.getHost(), uri.getPort());
403+
RestTemplate template = new RestTemplate(new HttpComponentsClientHttpRequestFactory() {
404+
405+
@Override
406+
@Nullable
407+
protected HttpContext createHttpContext(HttpMethod httpMethod, URI uri) {
408+
AuthCache cache = new BasicAuthCache();
409+
BasicScheme scheme = new BasicScheme();
410+
cache.put(host, scheme);
411+
BasicHttpContext context = new BasicHttpContext();
412+
context.setAttribute(HttpClientContext.AUTH_CACHE, cache);
413+
return context;
414+
}
415+
416+
});
417+
template.getInterceptors().add(new BasicAuthenticationInterceptor(this.adminUser, this.adminPassword));
418+
ResponseEntity<String> response = template.exchange(uri, HttpMethod.GET, null, String.class);
419+
return response.getStatusCode().equals(HttpStatus.OK)
420+
? response.getBody().equals("{\"status\":\"ok\"}")
421+
: false;
422+
}
423+
384424
public static boolean fatal() {
385425
String serversRequired = System.getenv(BROKER_REQUIRED);
386426
if (Boolean.parseBoolean(serversRequired)) {

spring-rabbit-junit/src/test/java/org/springframework/amqp/rabbit/junit/RabbitAvailableTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,7 +30,7 @@
3030
* @since 2.0.2
3131
*
3232
*/
33-
@RabbitAvailable(queues = "rabbitAvailableTests.queue")
33+
@RabbitAvailable(queues = "rabbitAvailableTests.queue", management = true)
3434
public class RabbitAvailableTests {
3535

3636
@Test

spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java

+40-5
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,13 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.net.URI;
22+
import java.net.URISyntaxException;
23+
import java.nio.charset.StandardCharsets;
24+
import java.time.Duration;
2125
import java.util.ArrayList;
2226
import java.util.List;
27+
import java.util.Map;
2328
import java.util.concurrent.CountDownLatch;
2429
import java.util.concurrent.Future;
2530
import java.util.concurrent.TimeUnit;
@@ -41,16 +46,19 @@
4146
import org.springframework.context.annotation.Bean;
4247
import org.springframework.context.annotation.Configuration;
4348
import org.springframework.context.annotation.DependsOn;
49+
import org.springframework.core.ParameterizedTypeReference;
50+
import org.springframework.http.MediaType;
4451
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
4552
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
4653
import org.springframework.rabbit.stream.retry.StreamRetryOperationsInterceptorFactoryBean;
4754
import org.springframework.rabbit.stream.support.StreamMessageProperties;
4855
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
4956
import org.springframework.test.annotation.DirtiesContext;
5057
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
58+
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
59+
import org.springframework.web.reactive.function.client.WebClient;
60+
import org.springframework.web.util.UriUtils;
5161

52-
import com.rabbitmq.http.client.Client;
53-
import com.rabbitmq.http.client.domain.QueueInfo;
5462
import com.rabbitmq.stream.Address;
5563
import com.rabbitmq.stream.Environment;
5664
import com.rabbitmq.stream.Message;
@@ -97,11 +105,38 @@ void nativeMsg(@Autowired RabbitTemplate template) throws InterruptedException {
97105
assertThat(this.config.latch4.await(10, TimeUnit.SECONDS)).isTrue();
98106
}
99107

108+
@SuppressWarnings("unchecked")
100109
@Test
101110
void queueOverAmqp() throws Exception {
102-
Client client = new Client("http://guest:guest@localhost:" + managementPort() + "/api");
103-
QueueInfo queue = client.getQueue("/", "stream.created.over.amqp");
104-
assertThat(queue.getArguments().get("x-queue-type")).isEqualTo("stream");
111+
WebClient client = WebClient.builder()
112+
.filter(ExchangeFilterFunctions.basicAuthentication("guest", "guest"))
113+
.build();
114+
Map<String, Object> queue = queueInfo("stream.created.over.amqp");
115+
assertThat(((Map<String, Object>) queue.get("arguments")).get("x-queue-type")).isEqualTo("stream");
116+
}
117+
118+
private Map<String, Object> queueInfo(String queueName) throws URISyntaxException {
119+
WebClient client = createClient("guest", "guest");
120+
URI uri = queueUri(queueName);
121+
return client.get()
122+
.uri(uri)
123+
.accept(MediaType.APPLICATION_JSON)
124+
.retrieve()
125+
.bodyToMono(new ParameterizedTypeReference<Map<String, Object>>() {
126+
})
127+
.block(Duration.ofSeconds(10));
128+
}
129+
130+
private URI queueUri(String queue) throws URISyntaxException {
131+
URI uri = new URI("http://localhost:" + managementPort() + "/api")
132+
.resolve("/api/queues/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8) + "/" + queue);
133+
return uri;
134+
}
135+
136+
private WebClient createClient(String adminUser, String adminPassword) {
137+
return WebClient.builder()
138+
.filter(ExchangeFilterFunctions.basicAuthentication(adminUser, adminPassword))
139+
.build();
105140
}
106141

107142
@Configuration(proxyBeanMethods = false)

0 commit comments

Comments
 (0)