Skip to content

Commit 8bc8f07

Browse files
artembilanspring-builds
authored andcommitted
GH-3032: Fix BlockingQueueConsumer for in-flight draining
Fixes: #3032 Issue link: #3032 The fix for #2941 has missed "in-flight draining" for non-transactional consumers. (cherry picked from commit 993e94a)
1 parent f2db096 commit 8bc8f07

File tree

1 file changed

+22
-16
lines changed

1 file changed

+22
-16
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

+22-16
Original file line numberDiff line numberDiff line change
@@ -535,10 +535,7 @@ private Message handle(@Nullable Delivery delivery) {
535535
*/
536536
@Nullable
537537
public Message nextMessage() throws InterruptedException, ShutdownSignalException {
538-
if (logger.isTraceEnabled()) {
539-
logger.trace("Retrieving delivery for " + this);
540-
}
541-
return handle(this.queue.take());
538+
return nextMessage(-1);
542539
}
543540

544541
/**
@@ -557,26 +554,35 @@ public Message nextMessage(long timeout) throws InterruptedException, ShutdownSi
557554
if (!this.missingQueues.isEmpty()) {
558555
checkMissingQueues();
559556
}
560-
if (!cancelled()) {
561-
Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
562-
if (message != null && cancelled()) {
563-
this.activeObjectCounter.release(this);
564-
ConsumerCancelledException consumerCancelledException = new ConsumerCancelledException();
565-
rollbackOnExceptionIfNecessary(consumerCancelledException,
566-
message.getMessageProperties().getDeliveryTag());
567-
throw consumerCancelledException;
557+
558+
if (this.transactional && cancelled()) {
559+
throw consumerCancelledException(null);
560+
}
561+
else {
562+
Message message = handle(timeout < 0 ? this.queue.take() : this.queue.poll(timeout, TimeUnit.MILLISECONDS));
563+
if (cancelled() && (message == null || this.transactional)) {
564+
Long deliveryTagToNack = null;
565+
if (message != null) {
566+
deliveryTagToNack = message.getMessageProperties().getDeliveryTag();
567+
}
568+
throw consumerCancelledException(deliveryTagToNack);
568569
}
569570
else {
570571
return message;
571572
}
572573
}
574+
}
575+
576+
private ConsumerCancelledException consumerCancelledException(@Nullable Long deliveryTagToNack) {
577+
this.activeObjectCounter.release(this);
578+
ConsumerCancelledException consumerCancelledException = new ConsumerCancelledException();
579+
if (deliveryTagToNack != null) {
580+
rollbackOnExceptionIfNecessary(consumerCancelledException, deliveryTagToNack);
581+
}
573582
else {
574583
this.deliveryTags.clear();
575-
this.activeObjectCounter.release(this);
576-
ConsumerCancelledException consumerCancelledException = new ConsumerCancelledException();
577-
rollbackOnExceptionIfNecessary(consumerCancelledException);
578-
throw consumerCancelledException;
579584
}
585+
return consumerCancelledException;
580586
}
581587

582588
/*

0 commit comments

Comments
 (0)