Skip to content

Commit 4deb40c

Browse files
committed
GH-2941: Fix BlockingQueueConsumer race condition on cancel
Fixes: #2941 Issue link: #2941 Now `BlockingQueueConsumer.basicCancel()` performs `RabbitUtils.closeMessageConsumer()` to initiate `basicRecovery` on the transactional consumer to re-queue all the un-acked messages. However, there is a race condition when one in-flight message may still be delivered to the listener and then TX commit is initiated. There a `basicAck()` is initiated. However, such a tag might already be discarded because of the previous `basicRecovery`. Therefore, adjust `BlockingQueueConsumer.commitIfNecessary()` to skip `basicAck()` if locally transacted and already cancelled. Right, this may lead to the duplication delivery, but having abnormal shutdown situation we cannot guarantee that this message to commit has been processed properly. Also, adjust `BlockingQueueConsumer.nextMessage()` to rollback a message if consumer is canceled instead of going through the loop via listener * Increase `replyTimeout` in the `EnableRabbitReturnTypesTests` for resource-sensitive builds
1 parent bc81ebc commit 4deb40c

File tree

2 files changed

+41
-21
lines changed

2 files changed

+41
-21
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

+40-21
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,8 @@ public class BlockingQueueConsumer {
170170

171171
private long consumeDelay;
172172

173-
private java.util.function.Consumer<String> missingQueuePublisher = str -> { };
173+
private java.util.function.Consumer<String> missingQueuePublisher = str -> {
174+
};
174175

175176
private boolean globalQos;
176177

@@ -468,12 +469,13 @@ protected void basicCancel() {
468469

469470
protected void basicCancel(boolean expected) {
470471
this.normalCancel = expected;
472+
this.cancelled.set(true);
473+
this.abortStarted = System.currentTimeMillis();
474+
471475
Collection<String> consumerTags = getConsumerTags();
472476
if (!CollectionUtils.isEmpty(consumerTags)) {
473477
RabbitUtils.closeMessageConsumer(this.channel, consumerTags, this.transactional);
474478
}
475-
this.cancelled.set(true);
476-
this.abortStarted = System.currentTimeMillis();
477479
}
478480

479481
protected boolean hasDelivery() {
@@ -560,12 +562,26 @@ public Message nextMessage(long timeout) throws InterruptedException, ShutdownSi
560562
if (!this.missingQueues.isEmpty()) {
561563
checkMissingQueues();
562564
}
563-
Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
564-
if (message == null && this.cancelled.get()) {
565+
if (!cancelled()) {
566+
Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
567+
if (message != null && cancelled()) {
568+
this.activeObjectCounter.release(this);
569+
ConsumerCancelledException consumerCancelledException = new ConsumerCancelledException();
570+
rollbackOnExceptionIfNecessary(consumerCancelledException,
571+
message.getMessageProperties().getDeliveryTag());
572+
throw consumerCancelledException;
573+
}
574+
else {
575+
return message;
576+
}
577+
}
578+
else {
579+
this.deliveryTags.clear();
565580
this.activeObjectCounter.release(this);
566-
throw new ConsumerCancelledException();
581+
ConsumerCancelledException consumerCancelledException = new ConsumerCancelledException();
582+
rollbackOnExceptionIfNecessary(consumerCancelledException);
583+
throw consumerCancelledException;
567584
}
568-
return message;
569585
}
570586

571587
/*
@@ -792,7 +808,7 @@ public void stop() {
792808
if (this.abortStarted == 0) { // signal handle delivery to use offer
793809
this.abortStarted = System.currentTimeMillis();
794810
}
795-
if (!this.cancelled()) {
811+
if (!cancelled()) {
796812
try {
797813
RabbitUtils.closeMessageConsumer(this.channel, getConsumerTags(), this.transactional);
798814
}
@@ -894,22 +910,25 @@ boolean commitIfNecessary(boolean localTx, boolean forceAck) {
894910
/*
895911
* If we have a TX Manager, but no TX, act like we are locally transacted.
896912
*/
897-
boolean isLocallyTransacted = localTx
898-
|| (this.transactional
899-
&& TransactionSynchronizationManager.getResource(this.connectionFactory) == null);
913+
boolean isLocallyTransacted =
914+
localTx ||
915+
(this.transactional &&
916+
TransactionSynchronizationManager.getResource(this.connectionFactory) == null);
900917
try {
901918
boolean ackRequired = forceAck || (!this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual());
902919

903-
if (ackRequired && (!this.transactional || isLocallyTransacted)) {
904-
long deliveryTag = new ArrayList<>(this.deliveryTags).get(this.deliveryTags.size() - 1);
905-
try {
906-
this.channel.basicAck(deliveryTag, true);
907-
notifyMessageAckListener(true, deliveryTag, null);
908-
}
909-
catch (Exception e) {
910-
logger.error("Error acking.", e);
911-
notifyMessageAckListener(false, deliveryTag, e);
912-
}
920+
if (ackRequired && (!this.transactional || (isLocallyTransacted && !cancelled()))) {
921+
OptionalLong deliveryTag = this.deliveryTags.stream().mapToLong(l -> l).max();
922+
deliveryTag.ifPresent((tag) -> {
923+
try {
924+
this.channel.basicAck(tag, true);
925+
notifyMessageAckListener(true, tag, null);
926+
}
927+
catch (Exception e) {
928+
logger.error("Error acking.", e);
929+
notifyMessageAckListener(false, tag, e);
930+
}
931+
});
913932
}
914933

915934
if (isLocallyTransacted) {

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitReturnTypesTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(Cachi
113113
public RabbitTemplate template(CachingConnectionFactory cf, Jackson2JsonMessageConverter converter) {
114114
RabbitTemplate template = new RabbitTemplate(cf);
115115
template.setMessageConverter(converter);
116+
template.setReplyTimeout(30_000);
116117
return template;
117118
}
118119

0 commit comments

Comments
 (0)