Skip to content

Commit 994a4c9

Browse files
committed
spring-projectsGH-1473: Move RabbitFutures to Top Level Classes
- to aid migration from 2.4.x to 3.0.x so that the return types will not change
1 parent 439ccd1 commit 994a4c9

File tree

8 files changed

+324
-176
lines changed

8 files changed

+324
-176
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate2.java

Lines changed: 50 additions & 163 deletions
Large diffs are not rendered by default.
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit;
18+
19+
import java.util.concurrent.ScheduledFuture;
20+
import java.util.function.BiConsumer;
21+
import java.util.function.Function;
22+
23+
import org.springframework.amqp.core.Message;
24+
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.ChannelHolder;
25+
import org.springframework.core.ParameterizedTypeReference;
26+
27+
/**
28+
* A {@link RabbitFuture} with a return type of the template's
29+
* generic parameter.
30+
* @param <C> the type.
31+
* @since 1.6
32+
*/
33+
public class RabbitConverterFuture<C> extends RabbitFuture<C> {
34+
35+
private volatile ParameterizedTypeReference<C> returnType;
36+
37+
RabbitConverterFuture(String correlationId, Message requestMessage,
38+
BiConsumer<String, ChannelHolder> canceler,
39+
Function<RabbitFuture<?>, ScheduledFuture<?>> timeoutTaskFunction) {
40+
41+
super(correlationId, requestMessage, canceler, timeoutTaskFunction);
42+
}
43+
44+
public ParameterizedTypeReference<C> getReturnType() {
45+
return this.returnType;
46+
}
47+
48+
public void setReturnType(ParameterizedTypeReference<C> returnType) {
49+
this.returnType = returnType;
50+
}
51+
52+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit;
18+
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.concurrent.ScheduledFuture;
21+
import java.util.function.BiConsumer;
22+
import java.util.function.Function;
23+
24+
import org.springframework.amqp.core.Message;
25+
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.ChannelHolder;
26+
27+
/**
28+
* Base class for {@link CompletableFuture}s returned by {@link AsyncRabbitTemplate2}.
29+
* @param <T> the type.
30+
* @since 2.4.7
31+
*/
32+
public abstract class RabbitFuture<T> extends CompletableFuture<T> {
33+
34+
private final String correlationId;
35+
36+
private final Message requestMessage;
37+
38+
private final BiConsumer<String, ChannelHolder> canceler;
39+
40+
private final Function<RabbitFuture<?>, ScheduledFuture<?>> timeoutTaskFunction;
41+
42+
private ScheduledFuture<?> timeoutTask;
43+
44+
private volatile CompletableFuture<Boolean> confirm;
45+
46+
private String nackCause;
47+
48+
private ChannelHolder channelHolder;
49+
50+
protected RabbitFuture(String correlationId, Message requestMessage, BiConsumer<String, ChannelHolder> canceler,
51+
Function<RabbitFuture<?>, ScheduledFuture<?>> timeoutTaskFunction) {
52+
53+
this.correlationId = correlationId;
54+
this.requestMessage = requestMessage;
55+
this.canceler = canceler;
56+
this.timeoutTaskFunction = timeoutTaskFunction;
57+
}
58+
59+
void setChannelHolder(ChannelHolder channel) {
60+
this.channelHolder = channel;
61+
}
62+
63+
String getCorrelationId() {
64+
return this.correlationId;
65+
}
66+
67+
ChannelHolder getChannelHolder() {
68+
return this.channelHolder;
69+
}
70+
71+
Message getRequestMessage() {
72+
return this.requestMessage;
73+
}
74+
75+
@Override
76+
public boolean cancel(boolean mayInterruptIfRunning) {
77+
if (this.timeoutTask != null) {
78+
this.timeoutTask.cancel(true);
79+
}
80+
this.canceler.accept(this.correlationId, this.channelHolder);
81+
return super.cancel(mayInterruptIfRunning);
82+
}
83+
84+
/**
85+
* When confirms are enabled contains a {@link CompletableFuture}
86+
* for the confirmation.
87+
* @return the future.
88+
*/
89+
public CompletableFuture<Boolean> getConfirm() {
90+
return this.confirm;
91+
}
92+
93+
void setConfirm(CompletableFuture<Boolean> confirm) {
94+
this.confirm = confirm;
95+
}
96+
97+
/**
98+
* When confirms are enabled and a nack is received, contains
99+
* the cause for the nack, if any.
100+
* @return the cause.
101+
*/
102+
public String getNackCause() {
103+
return this.nackCause;
104+
}
105+
106+
void setNackCause(String nackCause) {
107+
this.nackCause = nackCause;
108+
}
109+
110+
void startTimer() {
111+
this.timeoutTask = this.timeoutTaskFunction.apply(this);
112+
}
113+
114+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit;
18+
19+
import java.util.concurrent.ScheduledFuture;
20+
import java.util.function.BiConsumer;
21+
import java.util.function.Function;
22+
23+
import org.springframework.amqp.core.Message;
24+
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.ChannelHolder;
25+
26+
/**
27+
* A {@link RabbitFuture} with a return type of {@link Message}.
28+
* @since 2.4.7
29+
*/
30+
public class RabbitMessageFuture extends RabbitFuture<Message> {
31+
32+
RabbitMessageFuture(String correlationId, Message requestMessage, BiConsumer<String, ChannelHolder> canceler,
33+
Function<RabbitFuture<?>, ScheduledFuture<?>> timeoutTaskFunction) {
34+
35+
super(correlationId, requestMessage, canceler, timeoutTaskFunction);
36+
}
37+
38+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit;
18+
19+
import java.util.concurrent.ConcurrentMap;
20+
21+
import org.springframework.amqp.core.AmqpReplyTimeoutException;
22+
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer;
23+
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.ChannelHolder;
24+
import org.springframework.lang.Nullable;
25+
26+
/**
27+
* A {@link Runnable} used to time out a {@link RabbitFuture}.
28+
*
29+
* @author Gary Russell
30+
* @since 2.4.7
31+
*/
32+
public class TimeoutTask implements Runnable {
33+
34+
private final RabbitFuture<?> future;
35+
36+
private final ConcurrentMap<String, RabbitFuture<?>> pending;
37+
38+
private final DirectReplyToMessageListenerContainer container;
39+
40+
TimeoutTask(RabbitFuture<?> future, ConcurrentMap<String, RabbitFuture<?>> pending,
41+
@Nullable DirectReplyToMessageListenerContainer container) {
42+
43+
this.future = future;
44+
this.pending = pending;
45+
this.container = container;
46+
}
47+
48+
@Override
49+
public void run() {
50+
this.pending.remove(this.future.getCorrelationId());
51+
ChannelHolder holder = this.future.getChannelHolder();
52+
if (holder != null && this.container != null) {
53+
this.container.releaseConsumerFor(holder, false, null); // NOSONAR
54+
}
55+
this.future.completeExceptionally(
56+
new AmqpReplyTimeoutException("Reply timed out", this.future.getRequestMessage()));
57+
}
58+
59+
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate2Tests.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -40,8 +40,6 @@
4040
import org.springframework.amqp.core.Message;
4141
import org.springframework.amqp.core.MessageProperties;
4242
import org.springframework.amqp.core.Queue;
43-
import org.springframework.amqp.rabbit.AsyncRabbitTemplate2.RabbitConverterFuture2;
44-
import org.springframework.amqp.rabbit.AsyncRabbitTemplate2.RabbitMessageFuture2;
4543
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
4644
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType;
4745
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@@ -253,7 +251,7 @@ public void testReturnDirect() throws Exception {
253251
@DirtiesContext
254252
public void testConvertWithConfirm() throws Exception {
255253
this.asyncTemplate.setEnableConfirms(true);
256-
RabbitConverterFuture2<String> future = this.asyncTemplate.convertSendAndReceive("sleep");
254+
RabbitConverterFuture<String> future = this.asyncTemplate.convertSendAndReceive("sleep");
257255
CompletableFuture<Boolean> confirm = future.getConfirm();
258256
assertThat(confirm).isNotNull();
259257
assertThat(confirm.get(10, TimeUnit.SECONDS)).isTrue();
@@ -264,7 +262,7 @@ public void testConvertWithConfirm() throws Exception {
264262
@DirtiesContext
265263
public void testMessageWithConfirm() throws Exception {
266264
this.asyncTemplate.setEnableConfirms(true);
267-
RabbitMessageFuture2 future = this.asyncTemplate
265+
RabbitMessageFuture future = this.asyncTemplate
268266
.sendAndReceive(new SimpleMessageConverter().toMessage("sleep", new MessageProperties()));
269267
CompletableFuture<Boolean> confirm = future.getConfirm();
270268
assertThat(confirm).isNotNull();
@@ -276,7 +274,7 @@ public void testMessageWithConfirm() throws Exception {
276274
@DirtiesContext
277275
public void testConvertWithConfirmDirect() throws Exception {
278276
this.asyncDirectTemplate.setEnableConfirms(true);
279-
RabbitConverterFuture2<String> future = this.asyncDirectTemplate.convertSendAndReceive("sleep");
277+
RabbitConverterFuture<String> future = this.asyncDirectTemplate.convertSendAndReceive("sleep");
280278
CompletableFuture<Boolean> confirm = future.getConfirm();
281279
assertThat(confirm).isNotNull();
282280
assertThat(confirm.get(10, TimeUnit.SECONDS)).isTrue();
@@ -287,7 +285,7 @@ public void testConvertWithConfirmDirect() throws Exception {
287285
@DirtiesContext
288286
public void testMessageWithConfirmDirect() throws Exception {
289287
this.asyncDirectTemplate.setEnableConfirms(true);
290-
RabbitMessageFuture2 future = this.asyncDirectTemplate
288+
RabbitMessageFuture future = this.asyncDirectTemplate
291289
.sendAndReceive(new SimpleMessageConverter().toMessage("sleep", new MessageProperties()));
292290
CompletableFuture<Boolean> confirm = future.getConfirm();
293291
assertThat(confirm).isNotNull();
@@ -321,7 +319,7 @@ public void testReceiveTimeout() throws Exception {
321319
@DirtiesContext
322320
public void testReplyAfterReceiveTimeout() throws Exception {
323321
this.asyncTemplate.setReceiveTimeout(100);
324-
RabbitConverterFuture2<String> future = this.asyncTemplate.convertSendAndReceive("sleep");
322+
RabbitConverterFuture<String> future = this.asyncTemplate.convertSendAndReceive("sleep");
325323
TheCallback callback = new TheCallback();
326324
future.whenComplete(callback);
327325
assertThat(TestUtils.getPropertyValue(this.asyncTemplate, "pending", Map.class)).hasSize(1);
@@ -351,7 +349,7 @@ public void testReplyAfterReceiveTimeout() throws Exception {
351349
@DirtiesContext
352350
public void testStopCancelled() throws Exception {
353351
this.asyncTemplate.setReceiveTimeout(5000);
354-
RabbitConverterFuture2<String> future = this.asyncTemplate.convertSendAndReceive("noReply");
352+
RabbitConverterFuture<String> future = this.asyncTemplate.convertSendAndReceive("noReply");
355353
TheCallback callback = new TheCallback();
356354
future.whenComplete(callback);
357355
assertThat(TestUtils.getPropertyValue(this.asyncTemplate, "pending", Map.class)).hasSize(1);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/AsyncListenerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import org.springframework.amqp.core.AnonymousQueue;
3838
import org.springframework.amqp.core.Queue;
3939
import org.springframework.amqp.rabbit.AsyncRabbitTemplate2;
40-
import org.springframework.amqp.rabbit.AsyncRabbitTemplate2.RabbitConverterFuture2;
40+
import org.springframework.amqp.rabbit.RabbitConverterFuture;
4141
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
4242
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
4343
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@@ -106,7 +106,7 @@ public class AsyncListenerTests {
106106
@Test
107107
public void testAsyncListener() throws Exception {
108108
assertThat(this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo")).isEqualTo("FOO");
109-
RabbitConverterFuture2<Object> future = this.asyncTemplate.convertSendAndReceive(this.queue1.getName(), "foo");
109+
RabbitConverterFuture<Object> future = this.asyncTemplate.convertSendAndReceive(this.queue1.getName(), "foo");
110110
assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo("FOO");
111111
assertThat(this.config.typeId).isEqualTo("java.lang.String");
112112
assertThat(this.rabbitTemplate.convertSendAndReceive(this.queue2.getName(), "foo")).isEqualTo("FOO");

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/ComplexTypeJsonIntegrationTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.junit.jupiter.api.Test;
2525

2626
import org.springframework.amqp.rabbit.AsyncRabbitTemplate2;
27-
import org.springframework.amqp.rabbit.AsyncRabbitTemplate2.RabbitConverterFuture2;
27+
import org.springframework.amqp.rabbit.RabbitConverterFuture;
2828
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
2929
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
3030
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@@ -137,7 +137,7 @@ public void testAsyncSendAndReceive() throws Exception {
137137
new ParameterizedTypeReference<Foo<Bar<Baz, Qux>>>() { }));
138138
}
139139

140-
private void verifyFooBarBazQux(RabbitConverterFuture2<Foo<Bar<Baz, Qux>>> future) throws Exception {
140+
private void verifyFooBarBazQux(RabbitConverterFuture<Foo<Bar<Baz, Qux>>> future) throws Exception {
141141
verifyFooBarBazQux(future.get(10, TimeUnit.SECONDS));
142142
}
143143

0 commit comments

Comments
 (0)