Skip to content

Commit 5f26828

Browse files
artembilanspring-builds
authored andcommitted
GH-3039: Move Recovery in BlockingQueueConsumer into stop()
Fixes: #3039 Issue link: #3039 Currently, the `BlockingQueueConsumer` initiates a Basic Recovery command on the channel for transactional consumer immediately after Basic Cancel. However, it is possible still to try to handle in-flight messages during `shutdownTimeout` in the listener container * Leave only Basic Cancel command in the `BlockingQueueConsumer.basicCancel()` API * Revert `BlockingQueueConsumer.nextMessage(timeout)` method logic to normal loop until message pulled from the in-memory cache is `null` * Call `basicCancel(true)` from the `stop()` is not cancelled yet * Perform `channel.basicRecover()` for transactional channel in the `stop()`. This `stop()` is usually called from the listener container when in-flight messages have not been processed during `shutdownTimeout` (cherry picked from commit 14fe215)
1 parent a9e27b9 commit 5f26828

File tree

1 file changed

+26
-40
lines changed

1 file changed

+26
-40
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

+26-40
Original file line numberDiff line numberDiff line change
@@ -460,18 +460,19 @@ int getQueueCount() {
460460
}
461461

462462
protected void basicCancel() {
463-
basicCancel(false);
463+
basicCancel(true);
464464
}
465465

466466
protected void basicCancel(boolean expected) {
467467
this.normalCancel = expected;
468+
getConsumerTags()
469+
.forEach(consumerTag -> {
470+
if (this.channel.isOpen()) {
471+
RabbitUtils.cancel(this.channel, consumerTag);
472+
}
473+
});
468474
this.cancelled.set(true);
469475
this.abortStarted = System.currentTimeMillis();
470-
471-
Collection<String> consumerTags = getConsumerTags();
472-
if (!CollectionUtils.isEmpty(consumerTags)) {
473-
RabbitUtils.closeMessageConsumer(this.channel, consumerTags, this.transactional);
474-
}
475476
}
476477

477478
protected boolean hasDelivery() {
@@ -554,35 +555,12 @@ public Message nextMessage(long timeout) throws InterruptedException, ShutdownSi
554555
if (!this.missingQueues.isEmpty()) {
555556
checkMissingQueues();
556557
}
557-
558-
if (this.transactional && cancelled()) {
559-
throw consumerCancelledException(null);
560-
}
561-
else {
562-
Message message = handle(timeout < 0 ? this.queue.take() : this.queue.poll(timeout, TimeUnit.MILLISECONDS));
563-
if (cancelled() && (message == null || this.transactional)) {
564-
Long deliveryTagToNack = null;
565-
if (message != null) {
566-
deliveryTagToNack = message.getMessageProperties().getDeliveryTag();
567-
}
568-
throw consumerCancelledException(deliveryTagToNack);
569-
}
570-
else {
571-
return message;
572-
}
573-
}
574-
}
575-
576-
private ConsumerCancelledException consumerCancelledException(@Nullable Long deliveryTagToNack) {
577-
this.activeObjectCounter.release(this);
578-
ConsumerCancelledException consumerCancelledException = new ConsumerCancelledException();
579-
if (deliveryTagToNack != null) {
580-
rollbackOnExceptionIfNecessary(consumerCancelledException, deliveryTagToNack);
581-
}
582-
else {
583-
this.deliveryTags.clear();
558+
Message message = handle(timeout < 0 ? this.queue.take() : this.queue.poll(timeout, TimeUnit.MILLISECONDS));
559+
if (message == null && this.cancelled.get()) {
560+
this.activeObjectCounter.release(this);
561+
throw new ConsumerCancelledException();
584562
}
585-
return consumerCancelledException;
563+
return message;
586564
}
587565

588566
/*
@@ -808,13 +786,21 @@ public void stop() {
808786
this.abortStarted = System.currentTimeMillis();
809787
}
810788
if (!cancelled()) {
811-
try {
812-
RabbitUtils.closeMessageConsumer(this.channel, getConsumerTags(), this.transactional);
789+
basicCancel(true);
790+
}
791+
try {
792+
if (this.transactional) {
793+
/*
794+
* Re-queue in-flight messages if any
795+
* (after the consumer is cancelled to prevent the broker from simply sending them back to us).
796+
* Does not require a tx.commit.
797+
*/
798+
this.channel.basicRecover(true);
813799
}
814-
catch (Exception e) {
815-
if (logger.isDebugEnabled()) {
816-
logger.debug("Error closing consumer " + this, e);
817-
}
800+
}
801+
catch (Exception e) {
802+
if (logger.isDebugEnabled()) {
803+
logger.debug("Error closing consumer " + this, e);
818804
}
819805
}
820806
if (logger.isDebugEnabled()) {

0 commit comments

Comments
 (0)