Skip to content

Commit 8c4c1a7

Browse files
garyrussellartembilan
authored andcommitted
AMQP-828: No Exception on normal close
JIRA: https://jira.spring.io/browse/AMQP-828 JIRA: https://jira.spring.io/browse/AMQP-829 When a channel is closed normally (e.g. due to a timeout), the shutdown listener completed the future with the `ShutdownSignalException`. 2018-08-28 05:09:18,261 ERROR org.springframework.amqp.rabbit.core.RabbitTemplate [Test worker] : Consumer failed to receive message: TemplateConsumer [channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@39754e71 Shared Rabbit Connection: SimpleConnection@1a81c806 [delegate=amqp://[email protected]:5672/, localPort= 59780], consumerTag=null] com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0) This preempts the `ConsumeOkNotReceivedException` in some tests, since the future is already complete. **cherry-pick to 1.7.x** Will submit a separate PR for master since there's another future there.
1 parent 18f70d5 commit 8c4c1a7

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -1203,7 +1203,11 @@ private Delivery consumeDelivery(Channel channel, String queueName, long timeout
12031203
Delivery delivery = null;
12041204
RuntimeException exception = null;
12051205
CompletableFuture<Delivery> future = new CompletableFuture<>();
1206-
ShutdownListener shutdownListener = c -> future.completeExceptionally(c);
1206+
ShutdownListener shutdownListener = c -> {
1207+
if (!RabbitUtils.isNormalChannelClose(c)) {
1208+
future.completeExceptionally(c);
1209+
}
1210+
};
12071211
channel.addShutdownListener(shutdownListener);
12081212
ClosingRecoveryListener.addRecoveryListenerIfNecessary(channel);
12091213
DefaultConsumer consumer = createConsumer(queueName, channel, future,
@@ -1599,7 +1603,11 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
15991603

16001604
};
16011605
ClosingRecoveryListener.addRecoveryListenerIfNecessary(channel);
1602-
ShutdownListener shutdownListener = c -> pendingReply.completeExceptionally(c);
1606+
ShutdownListener shutdownListener = c -> {
1607+
if (!RabbitUtils.isNormalChannelClose(c)) {
1608+
pendingReply.completeExceptionally(c);
1609+
}
1610+
};
16031611
channel.addShutdownListener(shutdownListener);
16041612
channel.basicConsume(replyTo, true, consumerTag, this.noLocalReplyConsumer, true, null, consumer);
16051613
Message reply = null;

0 commit comments

Comments
 (0)