Skip to content

Commit 6da38e7

Browse files
committed
spring-projectsGH-1480: Switch to CompletableFuture in s-r-stream
Resolves spring-projects#1480
1 parent 9e04fb1 commit 6da38e7

File tree

6 files changed

+336
-5
lines changed

6 files changed

+336
-5
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.rabbit.stream.producer;
18+
19+
import java.util.concurrent.CompletableFuture;
20+
21+
import org.springframework.amqp.AmqpException;
22+
import org.springframework.amqp.core.Message;
23+
import org.springframework.amqp.core.MessagePostProcessor;
24+
import org.springframework.amqp.support.converter.MessageConverter;
25+
import org.springframework.lang.Nullable;
26+
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
27+
28+
import com.rabbitmq.stream.MessageBuilder;
29+
30+
/**
31+
* Provides methods for sending messages using a RabbitMQ Stream producer,
32+
* returning {@link CompletableFuture}.
33+
*
34+
* @author Gary Russell
35+
* @since 2.4.7
36+
*
37+
*/
38+
public interface RabbitStreamOperations2 extends AutoCloseable {
39+
40+
/**
41+
* Send a Spring AMQP message.
42+
* @param message the message.
43+
* @return a future to indicate success/failure.
44+
*/
45+
CompletableFuture<Boolean> send(Message message);
46+
47+
/**
48+
* Convert to and send a Spring AMQP message.
49+
* @param message the payload.
50+
* @return a future to indicate success/failure.
51+
*/
52+
CompletableFuture<Boolean> convertAndSend(Object message);
53+
54+
/**
55+
* Convert to and send a Spring AMQP message. If a {@link MessagePostProcessor} is
56+
* provided and returns {@code null}, the message is not sent and the future is
57+
* completed with {@code false}.
58+
* @param message the payload.
59+
* @param mpp a message post processor.
60+
* @return a future to indicate success/failure.
61+
*/
62+
CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);
63+
64+
/**
65+
* Send a native stream message.
66+
* @param message the message.
67+
* @return a future to indicate success/failure.
68+
* @see #messageBuilder()
69+
*/
70+
CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);
71+
72+
/**
73+
* Return the producer's {@link MessageBuilder} to create native stream messages.
74+
* @return the builder.
75+
* @see #send(com.rabbitmq.stream.Message)
76+
*/
77+
MessageBuilder messageBuilder();
78+
79+
/**
80+
* Return the message converter.
81+
* @return the converter.
82+
*/
83+
MessageConverter messageConverter();
84+
85+
/**
86+
* Return the stream message converter.
87+
* @return the converter;
88+
*/
89+
StreamMessageConverter streamMessageConverter();
90+
91+
@Override
92+
default void close() throws AmqpException {
93+
// narrow exception to avoid compiler warning - see
94+
// https://bugs.openjdk.java.net/browse/JDK-8155591
95+
}
96+
97+
}

spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@
4242
*
4343
* @author Gary Russell
4444
* @since 2.4
45-
*
45+
* @deprecated in favor of {@link RabbitStreamTemplate2}.
4646
*/
47+
@Deprecated
4748
public class RabbitStreamTemplate implements RabbitStreamOperations, BeanNameAware {
4849

4950
protected final LogAccessor logger = new LogAccessor(getClass()); // NOSONAR
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.rabbit.stream.producer;
18+
19+
import java.util.concurrent.CompletableFuture;
20+
21+
import org.springframework.amqp.core.Message;
22+
import org.springframework.amqp.core.MessagePostProcessor;
23+
import org.springframework.amqp.support.converter.MessageConverter;
24+
import org.springframework.amqp.support.converter.SimpleMessageConverter;
25+
import org.springframework.beans.factory.BeanNameAware;
26+
import org.springframework.core.log.LogAccessor;
27+
import org.springframework.lang.Nullable;
28+
import org.springframework.rabbit.stream.support.StreamMessageProperties;
29+
import org.springframework.rabbit.stream.support.converter.DefaultStreamMessageConverter;
30+
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
31+
import org.springframework.util.Assert;
32+
33+
import com.rabbitmq.stream.ConfirmationHandler;
34+
import com.rabbitmq.stream.Constants;
35+
import com.rabbitmq.stream.Environment;
36+
import com.rabbitmq.stream.MessageBuilder;
37+
import com.rabbitmq.stream.Producer;
38+
import com.rabbitmq.stream.ProducerBuilder;
39+
40+
/**
41+
* Default implementation of {@link RabbitStreamOperations}.
42+
*
43+
* @author Gary Russell
44+
* @since 2.4
45+
*
46+
*/
47+
public class RabbitStreamTemplate2 implements RabbitStreamOperations2, BeanNameAware {
48+
49+
protected final LogAccessor logger = new LogAccessor(getClass()); // NOSONAR
50+
51+
private final Environment environment;
52+
53+
private final String streamName;
54+
55+
private MessageConverter messageConverter = new SimpleMessageConverter();
56+
57+
private StreamMessageConverter streamConverter = new DefaultStreamMessageConverter();
58+
59+
private boolean streamConverterSet;
60+
61+
private Producer producer;
62+
63+
private String beanName;
64+
65+
private ProducerCustomizer producerCustomizer = (name, builder) -> { };
66+
67+
/**
68+
* Construct an instance with the provided {@link Environment}.
69+
* @param environment the environment.
70+
* @param streamName the stream name.
71+
*/
72+
public RabbitStreamTemplate2(Environment environment, String streamName) {
73+
Assert.notNull(environment, "'environment' cannot be null");
74+
Assert.notNull(streamName, "'streamName' cannot be null");
75+
this.environment = environment;
76+
this.streamName = streamName;
77+
}
78+
79+
80+
private synchronized Producer createOrGetProducer() {
81+
if (this.producer == null) {
82+
ProducerBuilder builder = this.environment.producerBuilder();
83+
builder.stream(this.streamName);
84+
this.producerCustomizer.accept(this.beanName, builder);
85+
this.producer = builder.build();
86+
if (!this.streamConverterSet) {
87+
((DefaultStreamMessageConverter) this.streamConverter).setBuilderSupplier(
88+
() -> this.producer.messageBuilder());
89+
}
90+
}
91+
return this.producer;
92+
}
93+
94+
@Override
95+
public synchronized void setBeanName(String name) {
96+
this.beanName = name;
97+
}
98+
99+
/**
100+
* Set a converter for {@link #convertAndSend(Object)} operations.
101+
* @param messageConverter the converter.
102+
*/
103+
public void setMessageConverter(MessageConverter messageConverter) {
104+
Assert.notNull(messageConverter, "'messageConverter' cannot be null");
105+
this.messageConverter = messageConverter;
106+
}
107+
108+
/**
109+
* Set a converter to convert from {@link Message} to {@link com.rabbitmq.stream.Message}
110+
* for {@link #send(Message)} and {@link #convertAndSend(Object)} methods.
111+
* @param streamConverter the converter.
112+
*/
113+
public synchronized void setStreamConverter(StreamMessageConverter streamConverter) {
114+
Assert.notNull(streamConverter, "'streamConverter' cannot be null");
115+
this.streamConverter = streamConverter;
116+
this.streamConverterSet = true;
117+
}
118+
119+
/**
120+
* Used to customize the {@link ProducerBuilder} before the {@link Producer} is built.
121+
* @param producerCustomizer the customizer;
122+
*/
123+
public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
124+
Assert.notNull(producerCustomizer, "'producerCustomizer' cannot be null");
125+
this.producerCustomizer = producerCustomizer;
126+
}
127+
128+
@Override
129+
public MessageConverter messageConverter() {
130+
return this.messageConverter;
131+
}
132+
133+
134+
@Override
135+
public StreamMessageConverter streamMessageConverter() {
136+
return this.streamConverter;
137+
}
138+
139+
140+
@Override
141+
public CompletableFuture<Boolean> send(Message message) {
142+
CompletableFuture<Boolean> future = new CompletableFuture<>();
143+
createOrGetProducer().send(this.streamConverter.fromMessage(message), handleConfirm(future));
144+
return future;
145+
}
146+
147+
@Override
148+
public CompletableFuture<Boolean> convertAndSend(Object message) {
149+
return convertAndSend(message, null);
150+
}
151+
152+
@Override
153+
public CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp) {
154+
Message message2 = this.messageConverter.toMessage(message, new StreamMessageProperties());
155+
Assert.notNull(message2, "The message converter returned null");
156+
if (mpp != null) {
157+
message2 = mpp.postProcessMessage(message2);
158+
if (message2 == null) {
159+
this.logger.debug("Message Post Processor returned null, message not sent");
160+
CompletableFuture<Boolean> future = new CompletableFuture<>();
161+
future.complete(false);
162+
return future;
163+
}
164+
}
165+
return send(message2);
166+
}
167+
168+
169+
@Override
170+
public CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message) {
171+
CompletableFuture<Boolean> future = new CompletableFuture<>();
172+
createOrGetProducer().send(message, handleConfirm(future));
173+
return future;
174+
}
175+
176+
@Override
177+
public MessageBuilder messageBuilder() {
178+
return createOrGetProducer().messageBuilder();
179+
}
180+
181+
private ConfirmationHandler handleConfirm(CompletableFuture<Boolean> future) {
182+
return confStatus -> {
183+
if (confStatus.isConfirmed()) {
184+
future.complete(true);
185+
}
186+
else {
187+
int code = confStatus.getCode();
188+
String errorMessage;
189+
switch (code) {
190+
case Constants.CODE_MESSAGE_ENQUEUEING_FAILED:
191+
errorMessage = "Message Enqueueing Failed";
192+
break;
193+
case Constants.CODE_PRODUCER_CLOSED:
194+
errorMessage = "Producer Closed";
195+
break;
196+
case Constants.CODE_PRODUCER_NOT_AVAILABLE:
197+
errorMessage = "Producer Not Available";
198+
break;
199+
case Constants.CODE_PUBLISH_CONFIRM_TIMEOUT:
200+
errorMessage = "Publish Confirm Timeout";
201+
break;
202+
default:
203+
errorMessage = "Unknown code: " + code;
204+
break;
205+
}
206+
future.completeExceptionally(new StreamSendException(errorMessage, code));
207+
}
208+
};
209+
}
210+
211+
/**
212+
* {@inheritDoc}
213+
* <p>
214+
* <b>Close the underlying producer; a new producer will be created on the next
215+
* operation that requires one.</b>
216+
*/
217+
@Override
218+
public synchronized void close() {
219+
if (this.producer != null) {
220+
this.producer.close();
221+
this.producer = null;
222+
}
223+
}
224+
225+
}

spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import org.springframework.context.annotation.Configuration;
4343
import org.springframework.context.annotation.DependsOn;
4444
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
45-
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
45+
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate2;
4646
import org.springframework.rabbit.stream.retry.StreamRetryOperationsInterceptorFactoryBean;
4747
import org.springframework.rabbit.stream.support.StreamMessageProperties;
4848
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
@@ -70,7 +70,7 @@ public class RabbitListenerTests extends AbstractIntegrationTests {
7070
Config config;
7171

7272
@Test
73-
void simple(@Autowired RabbitStreamTemplate template) throws Exception {
73+
void simple(@Autowired RabbitStreamTemplate2 template) throws Exception {
7474
Future<Boolean> future = template.convertAndSend("foo");
7575
assertThat(future.get(10, TimeUnit.SECONDS)).isTrue();
7676
future = template.convertAndSend("bar", msg -> msg);
@@ -247,8 +247,8 @@ RabbitTemplate template(CachingConnectionFactory cf) {
247247
}
248248

249249
@Bean
250-
RabbitStreamTemplate streamTemplate1(Environment env) {
251-
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
250+
RabbitStreamTemplate2 streamTemplate1(Environment env) {
251+
RabbitStreamTemplate2 template = new RabbitStreamTemplate2(env, "test.stream.queue1");
252252
template.setProducerCustomizer((name, builder) -> builder.name("test"));
253253
return template;
254254
}

src/reference/asciidoc/stream.adoc

+3
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ The `ProducerCustomizer` provides a mechanism to customize the producer before i
6767

6868
Refer to the https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/[Java Client Documentation] about customizing the `Environment` and `Producer`.
6969

70+
IMPORTANT: In version 2.4.7 `RabbitStreamOperations` and `RabbitStreamTemplate` have been deprecated in favor of `RabbitStreamOperations2` and `RabbitStreamTemplate2` respectively; they return `CompletableFuture` instead of `ListenableFuture`.
71+
`RabbitStreamOperations` and `RabbitStreamTemplate` will be removed in 3.0.
72+
7073
==== Receiving Messages
7174

7275
Asynchronous message reception is provided by the `StreamListenerContainer` (and the `StreamRabbitListenerContainerFactory` when using `@RabbitListener`).

src/reference/asciidoc/whats-new.adoc

+5
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,8 @@ See <<json-message-converter>> for more information.
3333

3434
The `AsyncRabbitTemplate` is deprecated in favor of `AsyncRabbitTemplate2` which returns `CompletableFuture` s instead of `ListenableFuture` s.
3535
See <<async-template>> for more information.
36+
37+
==== Stream Support Changes
38+
39+
`RabbitStreamOperations` and `RabbitStreamTemplate` have been deprecated in favor of `RabbitStreamOperations2` and `RabbitStreamTemplate2` respectively; they return `CompletableFuture` instead of `ListenableFuture`.
40+
See <<stream-support>> for more information.

0 commit comments

Comments
 (0)