Skip to content

Commit bf979dd

Browse files
garyrussellartembilan
authored andcommitted
GH-1055: Invoke Conf/Return callbacks on executor
Resolves #1055 Prevent deadlocks in the amqp-client. The reason the existing test didn't catch this is because the nested send was performed on a different template/connection. * Doc polishing
1 parent 07159e0 commit bf979dd

File tree

5 files changed

+79
-32
lines changed

5 files changed

+79
-32
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PublisherCallbackChannelImpl.java

Lines changed: 49 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
import java.util.concurrent.ConcurrentHashMap;
3333
import java.util.concurrent.ConcurrentMap;
3434
import java.util.concurrent.ConcurrentSkipListMap;
35+
import java.util.concurrent.CountDownLatch;
3536
import java.util.concurrent.ExecutorService;
37+
import java.util.concurrent.TimeUnit;
3638
import java.util.concurrent.TimeoutException;
3739

3840
import org.apache.commons.logging.Log;
@@ -92,8 +94,9 @@
9294
public class PublisherCallbackChannelImpl
9395
implements PublisherCallbackChannel, ConfirmListener, ReturnListener, ShutdownListener {
9496

95-
private static final MessagePropertiesConverter converter // NOSONAR - lower case
96-
= new DefaultMessagePropertiesConverter();
97+
private static final MessagePropertiesConverter CONVERTER = new DefaultMessagePropertiesConverter();
98+
99+
private static final long RETURN_CALLBACK_TIMEOUT = 60;
97100

98101
private final Log logger = LogFactory.getLog(this.getClass());
99102

@@ -109,8 +112,12 @@ public class PublisherCallbackChannelImpl
109112

110113
private final ExecutorService executor;
111114

115+
private final CountDownLatch returnLatch = new CountDownLatch(1);
116+
112117
private volatile java.util.function.Consumer<Channel> afterAckCallback;
113118

119+
private boolean hasReturned;
120+
114121
/**
115122
* Create a {@link PublisherCallbackChannelImpl} instance based on the provided
116123
* delegate and executor.
@@ -931,17 +938,6 @@ private synchronized void processAck(long seq, boolean ack, boolean multiple, bo
931938
catch (Exception e) {
932939
this.logger.error("Failed to process publisher confirm", e);
933940
}
934-
finally {
935-
try {
936-
if (this.afterAckCallback != null && getPendingConfirmsCount() == 0) {
937-
this.afterAckCallback.accept(this);
938-
this.afterAckCallback = null;
939-
}
940-
}
941-
catch (Exception e) {
942-
this.logger.error("Failed to invoke afterAckCallback", e);
943-
}
944-
}
945941
}
946942

947943
private void doProcessAck(long seq, boolean ack, boolean multiple, boolean remove) {
@@ -1017,17 +1013,34 @@ private void processMultipleAck(long seq, boolean ack) {
10171013
}
10181014

10191015
private void doHandleConfirm(boolean ack, Listener listener, PendingConfirm pendingConfirm) {
1020-
try {
1021-
if (listener.isConfirmListener()) {
1022-
if (this.logger.isDebugEnabled()) {
1023-
this.logger.debug("Sending confirm " + pendingConfirm);
1016+
this.executor.execute(() -> {
1017+
try {
1018+
if (listener.isConfirmListener()) {
1019+
if (this.hasReturned && !this.returnLatch.await(RETURN_CALLBACK_TIMEOUT, TimeUnit.SECONDS)) {
1020+
this.logger
1021+
.error("Return callback failed to execute in " + RETURN_CALLBACK_TIMEOUT + " seconds");
1022+
}
1023+
if (this.logger.isDebugEnabled()) {
1024+
this.logger.debug("Sending confirm " + pendingConfirm);
1025+
}
1026+
listener.handleConfirm(pendingConfirm, ack);
10241027
}
1025-
listener.handleConfirm(pendingConfirm, ack);
10261028
}
1027-
}
1028-
catch (Exception e) {
1029-
this.logger.error("Exception delivering confirm", e);
1030-
}
1029+
catch (Exception e) {
1030+
this.logger.error("Exception delivering confirm", e);
1031+
}
1032+
finally {
1033+
try {
1034+
if (this.afterAckCallback != null && getPendingConfirmsCount() == 0) {
1035+
this.afterAckCallback.accept(this);
1036+
this.afterAckCallback = null;
1037+
}
1038+
}
1039+
catch (Exception e) {
1040+
this.logger.error("Failed to invoke afterAckCallback", e);
1041+
}
1042+
}
1043+
});
10311044
}
10321045

10331046
@Override
@@ -1058,7 +1071,7 @@ public void handleReturn(int replyCode,
10581071
if (returnCorrelation != null) {
10591072
PendingConfirm confirm = this.pendingReturns.remove(returnCorrelation.toString());
10601073
if (confirm != null) {
1061-
MessageProperties messageProperties = converter.toMessageProperties(properties,
1074+
MessageProperties messageProperties = CONVERTER.toMessageProperties(properties,
10621075
new Envelope(0L, false, exchange, routingKey), StandardCharsets.UTF_8.name());
10631076
if (confirm.getCorrelationData() != null) {
10641077
confirm.getCorrelationData().setReturnedMessage(new Message(body, messageProperties)); // NOSONAR never null
@@ -1083,12 +1096,19 @@ public void handleReturn(int replyCode,
10831096
}
10841097
}
10851098
else {
1086-
try {
1087-
listener.handleReturn(replyCode, replyText, exchange, routingKey, properties, body);
1088-
}
1089-
catch (Exception e) {
1090-
this.logger.error("Exception delivering returned message ", e);
1091-
}
1099+
this.hasReturned = true;
1100+
Listener listenerToInvoke = listener;
1101+
this.executor.execute(() -> {
1102+
try {
1103+
listenerToInvoke.handleReturn(replyCode, replyText, exchange, routingKey, properties, body);
1104+
}
1105+
catch (Exception e) {
1106+
this.logger.error("Exception delivering returned message ", e);
1107+
}
1108+
finally {
1109+
this.returnLatch.countDown();
1110+
}
1111+
});
10921112
}
10931113
}
10941114

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePublisherCallbacksIntegrationTests.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,11 @@ public void testPublisherConfirmReceived() throws Exception {
157157
final CountDownLatch latch = new CountDownLatch(10000);
158158
final AtomicInteger acks = new AtomicInteger();
159159
final AtomicReference<CorrelationData> confirmCorrelation = new AtomicReference<CorrelationData>();
160+
AtomicReference<String> callbackThreadName = new AtomicReference<>();
160161
this.templateWithConfirmsEnabled.setConfirmCallback((correlationData, ack, cause) -> {
161162
acks.incrementAndGet();
162163
confirmCorrelation.set(correlationData);
164+
callbackThreadName.set(Thread.currentThread().getName());
163165
latch.countDown();
164166
});
165167
this.templateWithConfirmsEnabled.setCorrelationDataPostProcessor((m, c) -> new CorrelationData("abc"));
@@ -213,6 +215,7 @@ public Message postProcessMessage(Message message, Correlation correlation) {
213215
new DirectFieldAccessor(connectionFactoryWithConfirmsEnabled).setPropertyValue("logger", logger);
214216
cleanUp();
215217
verify(logger, never()).error(any());
218+
assertThat(callbackThreadName.get()).startsWith("spring-rabbit-deferred-pool");
216219
}
217220

218221
@Test
@@ -848,23 +851,29 @@ public void testWithFuture() throws Exception {
848851
assertThat(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck()).isTrue();
849852
CorrelationData cd2 = new CorrelationData();
850853
this.templateWithConfirmsEnabled.convertAndSend("", queue.getName(), "bar", cd2);
851-
// TODO: Uncomment when travis updates to rabbitmq 3.7
852-
// assertFalse(cd2.getFuture().get(10, TimeUnit.SECONDS).isAck());
854+
assertThat(cd2.getFuture().get(10, TimeUnit.SECONDS).isAck()).isFalse();
853855
CorrelationData cd3 = new CorrelationData();
854856
this.templateWithConfirmsEnabled.convertAndSend("NO_EXCHANGE_HERE", queue.getName(), "foo", cd3);
855857
assertThat(cd3.getFuture().get(10, TimeUnit.SECONDS).isAck()).isFalse();
856858
assertThat(cd3.getFuture().get().getReason()).contains("NOT_FOUND");
857859
CorrelationData cd4 = new CorrelationData("42");
858860
AtomicBoolean resent = new AtomicBoolean();
861+
AtomicReference<String> callbackThreadName = new AtomicReference<>();
862+
CountDownLatch callbackLatch = new CountDownLatch(1);
859863
this.templateWithConfirmsAndReturnsEnabled.setReturnCallback((m, r, rt, e, rk) -> {
860-
this.templateWithConfirmsEnabled.send(ROUTE, m);
864+
this.templateWithConfirmsAndReturnsEnabled.send(ROUTE, m);
865+
callbackThreadName.set(Thread.currentThread().getName());
861866
resent.set(true);
867+
callbackLatch.countDown();
862868
});
863869
this.templateWithConfirmsAndReturnsEnabled.convertAndSend("", "NO_QUEUE_HERE", "foo", cd4);
864870
assertThat(cd4.getFuture().get(10, TimeUnit.SECONDS).isAck()).isTrue();
871+
assertThat(callbackLatch.await(10, TimeUnit.SECONDS)).isTrue();
865872
assertThat(cd4.getReturnedMessage()).isNotNull();
866873
assertThat(resent.get()).isTrue();
874+
assertThat(callbackThreadName.get()).startsWith("spring-rabbit-deferred-pool");
867875
admin.deleteQueue(queue.getName());
876+
868877
}
869878

870879
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePublisherCallbacksIntegrationTests3.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import java.util.concurrent.CountDownLatch;
2323
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicBoolean;
2425
import java.util.concurrent.atomic.AtomicInteger;
2526

2627
import org.junit.jupiter.api.Test;
@@ -78,8 +79,11 @@ public void testDeferredChannelCacheNack() throws Exception {
7879
final CountDownLatch returnLatch = new CountDownLatch(1);
7980
final CountDownLatch confirmLatch = new CountDownLatch(1);
8081
final AtomicInteger cacheCount = new AtomicInteger();
82+
final AtomicBoolean returnCalledFirst = new AtomicBoolean();
8183
template.setConfirmCallback((cd, a, c) -> {
84+
CachingConnectionFactory lcf = cf;
8285
cacheCount.set(TestUtils.getPropertyValue(cf, "cachedChannelsNonTransactional", List.class).size());
86+
returnCalledFirst.set(returnLatch.getCount() == 0);
8387
confirmLatch.countDown();
8488
});
8589
template.setReturnCallback((m, r, rt, e, rk) -> {
@@ -97,6 +101,7 @@ public void testDeferredChannelCacheNack() throws Exception {
97101
assertThat(returnLatch.await(10, TimeUnit.SECONDS)).isTrue();
98102
assertThat(confirmLatch.await(10, TimeUnit.SECONDS)).isTrue();
99103
assertThat(cacheCount.get()).isEqualTo(1);
104+
assertThat(returnCalledFirst.get()).isTrue();
100105
cf.destroy();
101106
}
102107

src/reference/asciidoc/amqp.adoc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,6 +1096,15 @@ Version 2.1 and later no longer return the channel to the cache while confirms a
10961096
The `RabbitTemplate` performs a logical `close()` on the channel after each operation.
10971097
In general, this means that only one confirm is outstanding on a channel at a time.
10981098

1099+
NOTE: Starting with version 2.2, the callbacks are invoked on one of the connection factory's `executor` threads.
1100+
This is to avoid a potential deadlock if you perform Rabbit operations from within the callback.
1101+
With previous versions, the callbacks were invoked directly on the `amqp-client` connection I/O thread; this would deadlock if you perform some RPC operation (such as opening a new channel) since the I/O thread blocks waiting for the result, but the result needs to be processed by the I/O thread itself.
1102+
With those versions, it was necessary to hand off work (such as sending a messasge) to another thread within the callback.
1103+
This is no longer necessary since the framework now hands off the callback invocation to the executor.
1104+
1105+
IMPORTANT: The guarantee of receiving a returned message before the ack is still maintained as long as the return callback executes in 60 seconds or less.
1106+
The confirm is scheduled to be delivered after the return callback exits or after 60 seconds, whichever comes first.
1107+
10991108
Starting with version 2.1, the `CorrelationData` object has a `ListenableFuture` that you can \used to get the result, instead of using a `ConfirmCallback` on the template.
11001109
The following example shows how to configure a `CorrelationData` instance:
11011110

src/reference/asciidoc/whats-new.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ The `CachingConnectionFactory` has a new property `shuffleAddresses`.
6666
When providing a list of broker node addresses, the list will be shuffled before creating a connection so that the order in which the connections are attempted is random.
6767
See <<cluster>> for more information.
6868

69+
When using Publisher confirms and returns, the callbacks are now invoked on the connection factory's `executor`.
70+
This avoids a possible deadlock in the `amqp-clients` library if you perform rabbit operations from within the callback.
71+
See <<template-confirms>> for more information.
72+
6973
===== Other Changes
7074

7175
The `Declarables` object (for declaring multiple queues, exchanges, bindings) now has a filtered getter for each type.

0 commit comments

Comments
 (0)