Skip to content

Commit de20111

Browse files
committed
GH-2941: Fix BlockingQueueConsumer race condition on cancel
Fixes: #2941 Issue link: #2941 Now `BlockingQueueConsumer.basicCancel()` performs `RabbitUtils.closeMessageConsumer()` to initiate `basicRecovery` on the transactional consumer to re-queue all the un-acked messages. However, there is a race condition when one in-flight message may still be delivered to the listener and then TX commit is initiated. There a `basicAck()` is initiated. However, such a tag might already be discarded because of the previous `basicRecovery`. Therefore, adjust `BlockingQueueConsumer.commitIfNecessary()` to skip `basicAck()` if locally transacted and already cancelled. Right, this may lead to the duplication delivery, but having abnormal shutdown situation we cannot guarantee that this message to commit has been processed properly. Also, adjust `BlockingQueueConsumer.nextMessage()` to rollback a message if consumer is cancelled instead of going through the loop via listener * Increase `replyTimeout` in the `EnableRabbitReturnTypesTests` for resource-sensitive builds
1 parent c17b789 commit de20111

File tree

2 files changed

+41
-21
lines changed

2 files changed

+41
-21
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

+40-21
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,8 @@ public class BlockingQueueConsumer {
168168

169169
private long consumeDelay;
170170

171-
private java.util.function.Consumer<String> missingQueuePublisher = str -> { };
171+
private java.util.function.Consumer<String> missingQueuePublisher = str -> {
172+
};
172173

173174
private boolean globalQos;
174175

@@ -465,12 +466,13 @@ protected void basicCancel() {
465466

466467
protected void basicCancel(boolean expected) {
467468
this.normalCancel = expected;
469+
this.cancelled.set(true);
470+
this.abortStarted = System.currentTimeMillis();
471+
468472
Collection<String> consumerTags = getConsumerTags();
469473
if (!CollectionUtils.isEmpty(consumerTags)) {
470474
RabbitUtils.closeMessageConsumer(this.channel, consumerTags, this.transactional);
471475
}
472-
this.cancelled.set(true);
473-
this.abortStarted = System.currentTimeMillis();
474476
}
475477

476478
protected boolean hasDelivery() {
@@ -556,12 +558,26 @@ public Message nextMessage(long timeout) throws InterruptedException, ShutdownSi
556558
if (!this.missingQueues.isEmpty()) {
557559
checkMissingQueues();
558560
}
559-
Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
560-
if (message == null && this.cancelled.get()) {
561+
if (!cancelled()) {
562+
Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
563+
if (message != null && cancelled()) {
564+
this.activeObjectCounter.release(this);
565+
ConsumerCancelledException consumerCancelledException = new ConsumerCancelledException();
566+
rollbackOnExceptionIfNecessary(consumerCancelledException,
567+
message.getMessageProperties().getDeliveryTag());
568+
throw consumerCancelledException;
569+
}
570+
else {
571+
return message;
572+
}
573+
}
574+
else {
575+
this.deliveryTags.clear();
561576
this.activeObjectCounter.release(this);
562-
throw new ConsumerCancelledException();
577+
ConsumerCancelledException consumerCancelledException = new ConsumerCancelledException();
578+
rollbackOnExceptionIfNecessary(consumerCancelledException);
579+
throw consumerCancelledException;
563580
}
564-
return message;
565581
}
566582

567583
/*
@@ -786,7 +802,7 @@ public void stop() {
786802
if (this.abortStarted == 0) { // signal handle delivery to use offer
787803
this.abortStarted = System.currentTimeMillis();
788804
}
789-
if (!this.cancelled()) {
805+
if (!cancelled()) {
790806
try {
791807
RabbitUtils.closeMessageConsumer(this.channel, getConsumerTags(), this.transactional);
792808
}
@@ -902,22 +918,25 @@ boolean commitIfNecessary(boolean localTx, boolean forceAck) {
902918
/*
903919
* If we have a TX Manager, but no TX, act like we are locally transacted.
904920
*/
905-
boolean isLocallyTransacted = localTx
906-
|| (this.transactional
907-
&& TransactionSynchronizationManager.getResource(this.connectionFactory) == null);
921+
boolean isLocallyTransacted =
922+
localTx ||
923+
(this.transactional &&
924+
TransactionSynchronizationManager.getResource(this.connectionFactory) == null);
908925
try {
909926
boolean ackRequired = forceAck || (!this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual());
910927

911-
if (ackRequired && (!this.transactional || isLocallyTransacted)) {
912-
long deliveryTag = new ArrayList<>(this.deliveryTags).get(this.deliveryTags.size() - 1);
913-
try {
914-
this.channel.basicAck(deliveryTag, true);
915-
notifyMessageAckListener(true, deliveryTag, null);
916-
}
917-
catch (Exception e) {
918-
logger.error("Error acking.", e);
919-
notifyMessageAckListener(false, deliveryTag, e);
920-
}
928+
if (ackRequired && (!this.transactional || (isLocallyTransacted && !cancelled()))) {
929+
OptionalLong deliveryTag = this.deliveryTags.stream().mapToLong(l -> l).max();
930+
deliveryTag.ifPresent((tag) -> {
931+
try {
932+
this.channel.basicAck(tag, true);
933+
notifyMessageAckListener(true, tag, null);
934+
}
935+
catch (Exception e) {
936+
logger.error("Error acking.", e);
937+
notifyMessageAckListener(false, tag, e);
938+
}
939+
});
921940
}
922941

923942
if (isLocallyTransacted) {

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitReturnTypesTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(Cachi
113113
public RabbitTemplate template(CachingConnectionFactory cf, Jackson2JsonMessageConverter converter) {
114114
RabbitTemplate template = new RabbitTemplate(cf);
115115
template.setMessageConverter(converter);
116+
template.setReplyTimeout(30_000);
116117
return template;
117118
}
118119

0 commit comments

Comments
 (0)