From f446db11957c606239c3b9492ddd1850c0f5d16d Mon Sep 17 00:00:00 2001 From: Jeonggi Kim Date: Mon, 22 Jan 2024 11:59:14 +0900 Subject: [PATCH 1/3] GH-2601: Add a batchReceiveTimeout Fixes https://github.com/spring-projects/spring-amqp/issues/2601 stop to waiting next message and execute listener when batchReceiveTimeout is timed out. --- .../config/ListenerContainerFactoryBean.java | 10 +++- .../SimpleRabbitListenerContainerFactory.java | 16 +++++- .../SimpleMessageListenerContainer.java | 23 +++++++- .../SimpleMessageListenerContainerTests.java | 54 +++++++++++++++++++ 4 files changed, 99 insertions(+), 4 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java index a23d29ca83..748f088f28 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,6 +55,7 @@ * @author Gary Russell * @author Artem Bilan * @author Johno Crawford + * @author Jeonggi Kim * * @since 2.0 * @@ -166,6 +167,8 @@ public class ListenerContainerFactoryBean extends AbstractFactoryBean @@ -552,6 +559,7 @@ private AbstractMessageListenerContainer createContainer() { .acceptIfNotNull(this.consecutiveActiveTrigger, container::setConsecutiveActiveTrigger) .acceptIfNotNull(this.consecutiveIdleTrigger, container::setConsecutiveIdleTrigger) .acceptIfNotNull(this.receiveTimeout, container::setReceiveTimeout) + .acceptIfNotNull(this.batchReceiveTimeout, container::setBatchReceiveTimeout) .acceptIfNotNull(this.batchSize, container::setBatchSize) .acceptIfNotNull(this.consumerBatchEnabled, container::setConsumerBatchEnabled) .acceptIfNotNull(this.declarationRetries, container::setDeclarationRetries) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java index 2e8c5afdd6..6a16d53d83 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2022 the original author or authors. + * Copyright 2014-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,6 +32,7 @@ * @author Gary Russell * @author Artem Bilan * @author Dustin Schultz + * @author Jeonggi Kim * * @since 1.4 */ @@ -54,6 +55,8 @@ public class SimpleRabbitListenerContainerFactory private Long receiveTimeout; + private Long batchReceiveTimeout; + private Boolean consumerBatchEnabled; /** @@ -121,6 +124,14 @@ public void setReceiveTimeout(Long receiveTimeout) { this.receiveTimeout = receiveTimeout; } + /** + * @param batchReceiveTimeout the timeout for gather batch messages. + * @see SimpleMessageListenerContainer#setBatchReceiveTimeout + */ + public void setBatchReceiveTimeout(Long batchReceiveTimeout) { + this.batchReceiveTimeout = batchReceiveTimeout; + } + /** * Set to true to present a list of messages based on the {@link #setBatchSize(Integer)}, * if the listener supports it. Starting with version 3.0, setting this to true will @@ -163,7 +174,8 @@ protected void initializeContainer(SimpleMessageListenerContainer instance, Rabb .acceptIfNotNull(this.stopConsumerMinInterval, instance::setStopConsumerMinInterval) .acceptIfNotNull(this.consecutiveActiveTrigger, instance::setConsecutiveActiveTrigger) .acceptIfNotNull(this.consecutiveIdleTrigger, instance::setConsecutiveIdleTrigger) - .acceptIfNotNull(this.receiveTimeout, instance::setReceiveTimeout); + .acceptIfNotNull(this.receiveTimeout, instance::setReceiveTimeout) + .acceptIfNotNull(this.batchReceiveTimeout, instance::setBatchReceiveTimeout); if (Boolean.TRUE.equals(this.consumerBatchEnabled)) { instance.setConsumerBatchEnabled(true); /* diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java index 9007bfda0d..6560ef5bba 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -80,6 +80,7 @@ * @author Mat Jaggard * @author Yansong Ren * @author Tim Bourquin + * @author Jeonggi Kim * * @since 1.0 */ @@ -97,6 +98,8 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta private static final int DEFAULT_CONSECUTIVE_IDLE_TRIGGER = 10; + private static final long DEFAULT_BATCH_RECEIVE_TIMEOUT = 3000; + public static final long DEFAULT_RECEIVE_TIMEOUT = 1000; private final AtomicLong lastNoMessageAlert = new AtomicLong(); @@ -121,6 +124,8 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT; + private long batchReceiveTimeout = DEFAULT_BATCH_RECEIVE_TIMEOUT; + private Set consumers; private Integer declarationRetries; @@ -330,6 +335,16 @@ public void setReceiveTimeout(long receiveTimeout) { this.receiveTimeout = receiveTimeout; } + /** + * The time (in milliseconds) that a waiting time to fill batch size. Default + * 3000 (3 second). + * @param batchReceiveTimeout the timeout + */ + public void setBatchReceiveTimeout(long batchReceiveTimeout) { + Assert.isTrue(batchReceiveTimeout > 0, "'batchReceiveTimeout' must be > 0"); + this.batchReceiveTimeout = batchReceiveTimeout; + } + /** * This property has several functions. *

@@ -996,8 +1011,14 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep List messages = null; long deliveryTag = 0; + long startTime = System.currentTimeMillis(); for (int i = 0; i < this.batchSize; i++) { + boolean batchTimedOut = (System.currentTimeMillis() - startTime) > this.batchReceiveTimeout; + if (batchTimedOut) { + logger.trace("Timed out for gather batch messages."); + break; + } logger.trace("Waiting for message from consumer."); Message message = consumer.nextMessage(this.receiveTimeout); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java index 7044f5ee6c..26c72b14e3 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java @@ -110,6 +110,7 @@ * @author Mohammad Hewedy * @author Yansong Ren * @author Tim Bourquin + * @author Jeonggi Kim */ public class SimpleMessageListenerContainerTests { @@ -784,6 +785,59 @@ void testWithConsumerStartWhenNotActive() { assertThat(start.getCount()).isEqualTo(0L); } + @Test + public void testBatchReceiveTimedOut() throws Exception { + ConnectionFactory connectionFactory = mock(ConnectionFactory.class); + Connection connection = mock(Connection.class); + Channel channel = mock(Channel.class); + given(connectionFactory.createConnection()).willReturn(connection); + given(connection.createChannel(false)).willReturn(channel); + final AtomicReference consumer = new AtomicReference<>(); + willAnswer(invocation -> { + consumer.set(invocation.getArgument(6)); + consumer.get().handleConsumeOk("1"); + return "1"; + }).given(channel) + .basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyMap(), + any(Consumer.class)); + final CountDownLatch latch = new CountDownLatch(2); + willAnswer(invocation -> { + latch.countDown(); + return null; + }).given(channel).basicAck(anyLong(), anyBoolean()); + + final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); + container.setAfterReceivePostProcessors(msg -> null); + container.setQueueNames("foo"); + MessageListener listener = mock(BatchMessageListener.class); + container.setMessageListener(listener); + container.setBatchSize(3); + container.setConsumerBatchEnabled(true); + container.setReceiveTimeout(10); + container.setBatchReceiveTimeout(20); + container.start(); + + BasicProperties props = new BasicProperties(); + byte[] payload = "baz".getBytes(); + Envelope envelope = new Envelope(1L, false, "foo", "bar"); + consumer.get().handleDelivery("1", envelope, props, payload); + envelope = new Envelope(2L, false, "foo", "bar"); + consumer.get().handleDelivery("1", envelope, props, payload); + // waiting for batch receive timed out + Thread.sleep(20); + envelope = new Envelope(3L, false, "foo", "bar"); + consumer.get().handleDelivery("1", envelope, props, payload); + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + verify(channel, never()).basicAck(eq(1), anyBoolean()); + verify(channel).basicAck(2, true); + verify(channel, never()).basicAck(eq(2), anyBoolean()); + verify(channel).basicAck(3, true); + container.stop(); + verify(listener).containerAckMode(AcknowledgeMode.AUTO); + verify(listener).isAsyncReplies(); + verifyNoMoreInteractions(listener); + } + private Answer messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container, final boolean cancel, final CountDownLatch latch) { return invocation -> { From c2ee36044ff09fc6920dda8107a44373841fa03d Mon Sep 17 00:00:00 2001 From: Jeonggi Kim Date: Tue, 23 Jan 2024 11:05:00 +0900 Subject: [PATCH 2/3] PR review --- .../config/ListenerContainerFactoryBean.java | 8 ++++++ .../SimpleRabbitListenerContainerFactory.java | 7 ++++- .../SimpleMessageListenerContainer.java | 26 ++++++++++++------- .../ROOT/pages/amqp/containerAttributes.adoc | 14 +++++++++- 4 files changed, 44 insertions(+), 11 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java index 748f088f28..0a070e1eca 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java @@ -392,6 +392,14 @@ public void setReceiveTimeout(long receiveTimeout) { this.receiveTimeout = receiveTimeout; } + /** + * The number of milliseconds of timeout for gathering batch messages. + * It limits the time to wait to fill batchSize. + * Default is 0 (no timeout). + * @param batchReceiveTimeout the timeout for gathering batch messages. + * @since 3.1.2 + * @see #setBatchSize(int) + */ public void setBatchReceiveTimeout(long batchReceiveTimeout) { this.batchReceiveTimeout = batchReceiveTimeout; } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java index 6a16d53d83..baa4c975df 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java @@ -125,8 +125,13 @@ public void setReceiveTimeout(Long receiveTimeout) { } /** - * @param batchReceiveTimeout the timeout for gather batch messages. + * The number of milliseconds of timeout for gathering batch messages. + * It limits the time to wait to fill batchSize. + * Default is 0 (no timeout). + * @param batchReceiveTimeout the timeout for gathering batch messages. + * @since 3.1.2 * @see SimpleMessageListenerContainer#setBatchReceiveTimeout + * @see #setBatchSize(Integer) */ public void setBatchReceiveTimeout(Long batchReceiveTimeout) { this.batchReceiveTimeout = batchReceiveTimeout; diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java index 6560ef5bba..90ed5fece1 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java @@ -98,8 +98,6 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta private static final int DEFAULT_CONSECUTIVE_IDLE_TRIGGER = 10; - private static final long DEFAULT_BATCH_RECEIVE_TIMEOUT = 3000; - public static final long DEFAULT_RECEIVE_TIMEOUT = 1000; private final AtomicLong lastNoMessageAlert = new AtomicLong(); @@ -124,7 +122,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT; - private long batchReceiveTimeout = DEFAULT_BATCH_RECEIVE_TIMEOUT; + private long batchReceiveTimeout; private Set consumers; @@ -336,12 +334,15 @@ public void setReceiveTimeout(long receiveTimeout) { } /** - * The time (in milliseconds) that a waiting time to fill batch size. Default - * 3000 (3 second). - * @param batchReceiveTimeout the timeout + * The number of milliseconds of timeout for gathering batch messages. + * It limits the time to wait to fill batchSize. + * Default is 0 (no timeout). + * @param batchReceiveTimeout the timeout for gathering batch messages. + * @since 3.1.2 + * @see #setBatchSize(int) */ public void setBatchReceiveTimeout(long batchReceiveTimeout) { - Assert.isTrue(batchReceiveTimeout > 0, "'batchReceiveTimeout' must be > 0"); + Assert.isTrue(batchReceiveTimeout >= 0, "'batchReceiveTimeout' must be >= 0"); this.batchReceiveTimeout = batchReceiveTimeout; } @@ -1014,9 +1015,16 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep long startTime = System.currentTimeMillis(); for (int i = 0; i < this.batchSize; i++) { - boolean batchTimedOut = (System.currentTimeMillis() - startTime) > this.batchReceiveTimeout; + boolean batchTimedOut = this.batchReceiveTimeout > 0 && + (System.currentTimeMillis() - startTime) > this.batchReceiveTimeout; if (batchTimedOut) { - logger.trace("Timed out for gather batch messages."); + if (logger.isTraceEnabled()) { + long gathered = 0; + if (messages != null) { + gathered = messages.size(); + } + logger.trace("Timed out for gathering batch messages. gathered size is " + gathered); + } break; } diff --git a/src/reference/antora/modules/ROOT/pages/amqp/containerAttributes.adoc b/src/reference/antora/modules/ROOT/pages/amqp/containerAttributes.adoc index 4f9d827f33..9aec7d7c9b 100644 --- a/src/reference/antora/modules/ROOT/pages/amqp/containerAttributes.adoc +++ b/src/reference/antora/modules/ROOT/pages/amqp/containerAttributes.adoc @@ -198,7 +198,7 @@ a| |[[consumerBatchEnabled]]<> + (batch-enabled) -|If the `MessageListener` supports it, setting this to true enables batching of discrete messages, up to `batchSize`; a partial batch will be delivered if no new messages arrive in `receiveTimeout`. +|If the `MessageListener` supports it, setting this to true enables batching of discrete messages, up to `batchSize`; a partial batch will be delivered if no new messages arrive in `receiveTimeout` or gathering batch messages time exceeded `batchReceiveTimeout`. When this is false, batching is only supported for batches created by a producer; see xref:amqp/sending-messages.adoc#template-batching[Batching]. a|image::tickmark.png[] @@ -611,6 +611,18 @@ a|image::tickmark.png[] a| a| +|[[batchReceiveTimeout]]<> + +(batch-receive-timeout) + +|The number of milliseconds of timeout for gathering batch messages. +It limits the time to wait to fill batchSize. +When `batchSize > 1` and the time to gathering batch messages is greater than `batchReceiveTime`, batch will be delivered. +Default is 0 (no timeout). + +a|image::tickmark.png[] +a| +a| + |[[recoveryBackOff]]<> + (recovery-back-off) From d33f2e19efee4d68823ff55600ebac3dabc084fc Mon Sep 17 00:00:00 2001 From: Jeonggi Kim Date: Thu, 25 Jan 2024 09:24:59 +0900 Subject: [PATCH 3/3] PR review --- .../listener/SimpleMessageListenerContainer.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java index 90ed5fece1..737ee14716 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java @@ -1012,17 +1012,14 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep List messages = null; long deliveryTag = 0; - long startTime = System.currentTimeMillis(); - + boolean isBatchReceiveTimeoutEnabled = this.batchReceiveTimeout > 0; + long startTime = isBatchReceiveTimeoutEnabled ? System.currentTimeMillis() : 0; for (int i = 0; i < this.batchSize; i++) { - boolean batchTimedOut = this.batchReceiveTimeout > 0 && + boolean batchTimedOut = isBatchReceiveTimeoutEnabled && (System.currentTimeMillis() - startTime) > this.batchReceiveTimeout; if (batchTimedOut) { if (logger.isTraceEnabled()) { - long gathered = 0; - if (messages != null) { - gathered = messages.size(); - } + long gathered = messages != null ? messages.size() : 0; logger.trace("Timed out for gathering batch messages. gathered size is " + gathered); } break;