Skip to content

GH-2601: Add a batchReceiveTimeout #2605

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,6 +55,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Johno Crawford
* @author Jeonggi Kim
*
* @since 2.0
*
Expand Down Expand Up @@ -166,6 +167,8 @@ public class ListenerContainerFactoryBean extends AbstractFactoryBean<AbstractMe

private Long receiveTimeout;

private Long batchReceiveTimeout;

private Integer batchSize;

private Integer declarationRetries;
Expand Down Expand Up @@ -389,6 +392,18 @@ public void setReceiveTimeout(long receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}

/**
* The number of milliseconds of timeout for gathering batch messages.
* It limits the time to wait to fill batchSize.
* Default is 0 (no timeout).
* @param batchReceiveTimeout the timeout for gathering batch messages.
* @since 3.1.2
* @see #setBatchSize(int)
*/
public void setBatchReceiveTimeout(long batchReceiveTimeout) {
this.batchReceiveTimeout = batchReceiveTimeout;
}

/**
* This property has several functions.
* <p>
Expand Down Expand Up @@ -552,6 +567,7 @@ private AbstractMessageListenerContainer createContainer() {
.acceptIfNotNull(this.consecutiveActiveTrigger, container::setConsecutiveActiveTrigger)
.acceptIfNotNull(this.consecutiveIdleTrigger, container::setConsecutiveIdleTrigger)
.acceptIfNotNull(this.receiveTimeout, container::setReceiveTimeout)
.acceptIfNotNull(this.batchReceiveTimeout, container::setBatchReceiveTimeout)
.acceptIfNotNull(this.batchSize, container::setBatchSize)
.acceptIfNotNull(this.consumerBatchEnabled, container::setConsumerBatchEnabled)
.acceptIfNotNull(this.declarationRetries, container::setDeclarationRetries)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2022 the original author or authors.
* Copyright 2014-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Dustin Schultz
* @author Jeonggi Kim
*
* @since 1.4
*/
Expand All @@ -54,6 +55,8 @@ public class SimpleRabbitListenerContainerFactory

private Long receiveTimeout;

private Long batchReceiveTimeout;

private Boolean consumerBatchEnabled;

/**
Expand Down Expand Up @@ -121,6 +124,19 @@ public void setReceiveTimeout(Long receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}

/**
* The number of milliseconds of timeout for gathering batch messages.
* It limits the time to wait to fill batchSize.
* Default is 0 (no timeout).
* @param batchReceiveTimeout the timeout for gathering batch messages.
* @since 3.1.2
* @see SimpleMessageListenerContainer#setBatchReceiveTimeout
* @see #setBatchSize(Integer)
*/
public void setBatchReceiveTimeout(Long batchReceiveTimeout) {
this.batchReceiveTimeout = batchReceiveTimeout;
}

/**
* Set to true to present a list of messages based on the {@link #setBatchSize(Integer)},
* if the listener supports it. Starting with version 3.0, setting this to true will
Expand Down Expand Up @@ -163,7 +179,8 @@ protected void initializeContainer(SimpleMessageListenerContainer instance, Rabb
.acceptIfNotNull(this.stopConsumerMinInterval, instance::setStopConsumerMinInterval)
.acceptIfNotNull(this.consecutiveActiveTrigger, instance::setConsecutiveActiveTrigger)
.acceptIfNotNull(this.consecutiveIdleTrigger, instance::setConsecutiveIdleTrigger)
.acceptIfNotNull(this.receiveTimeout, instance::setReceiveTimeout);
.acceptIfNotNull(this.receiveTimeout, instance::setReceiveTimeout)
.acceptIfNotNull(this.batchReceiveTimeout, instance::setBatchReceiveTimeout);
if (Boolean.TRUE.equals(this.consumerBatchEnabled)) {
instance.setConsumerBatchEnabled(true);
/*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -80,6 +80,7 @@
* @author Mat Jaggard
* @author Yansong Ren
* @author Tim Bourquin
* @author Jeonggi Kim
*
* @since 1.0
*/
Expand Down Expand Up @@ -121,6 +122,8 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta

private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;

private long batchReceiveTimeout;

private Set<BlockingQueueConsumer> consumers;

private Integer declarationRetries;
Expand Down Expand Up @@ -330,6 +333,19 @@ public void setReceiveTimeout(long receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}

/**
* The number of milliseconds of timeout for gathering batch messages.
* It limits the time to wait to fill batchSize.
* Default is 0 (no timeout).
* @param batchReceiveTimeout the timeout for gathering batch messages.
* @since 3.1.2
* @see #setBatchSize(int)
*/
public void setBatchReceiveTimeout(long batchReceiveTimeout) {
Assert.isTrue(batchReceiveTimeout >= 0, "'batchReceiveTimeout' must be >= 0");
this.batchReceiveTimeout = batchReceiveTimeout;
}

/**
* This property has several functions.
* <p>
Expand Down Expand Up @@ -996,8 +1012,18 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep

List<Message> messages = null;
long deliveryTag = 0;

boolean isBatchReceiveTimeoutEnabled = this.batchReceiveTimeout > 0;
long startTime = isBatchReceiveTimeoutEnabled ? System.currentTimeMillis() : 0;
for (int i = 0; i < this.batchSize; i++) {
boolean batchTimedOut = isBatchReceiveTimeoutEnabled &&
(System.currentTimeMillis() - startTime) > this.batchReceiveTimeout;
if (batchTimedOut) {
if (logger.isTraceEnabled()) {
long gathered = messages != null ? messages.size() : 0;
logger.trace("Timed out for gathering batch messages. gathered size is " + gathered);
}
break;
}

logger.trace("Waiting for message from consumer.");
Message message = consumer.nextMessage(this.receiveTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
* @author Mohammad Hewedy
* @author Yansong Ren
* @author Tim Bourquin
* @author Jeonggi Kim
*/
public class SimpleMessageListenerContainerTests {

Expand Down Expand Up @@ -784,6 +785,59 @@ void testWithConsumerStartWhenNotActive() {
assertThat(start.getCount()).isEqualTo(0L);
}

@Test
public void testBatchReceiveTimedOut() throws Exception {
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
Connection connection = mock(Connection.class);
Channel channel = mock(Channel.class);
given(connectionFactory.createConnection()).willReturn(connection);
given(connection.createChannel(false)).willReturn(channel);
final AtomicReference<Consumer> consumer = new AtomicReference<>();
willAnswer(invocation -> {
consumer.set(invocation.getArgument(6));
consumer.get().handleConsumeOk("1");
return "1";
}).given(channel)
.basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyMap(),
any(Consumer.class));
final CountDownLatch latch = new CountDownLatch(2);
willAnswer(invocation -> {
latch.countDown();
return null;
}).given(channel).basicAck(anyLong(), anyBoolean());

final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setAfterReceivePostProcessors(msg -> null);
container.setQueueNames("foo");
MessageListener listener = mock(BatchMessageListener.class);
container.setMessageListener(listener);
container.setBatchSize(3);
container.setConsumerBatchEnabled(true);
container.setReceiveTimeout(10);
container.setBatchReceiveTimeout(20);
container.start();

BasicProperties props = new BasicProperties();
byte[] payload = "baz".getBytes();
Envelope envelope = new Envelope(1L, false, "foo", "bar");
consumer.get().handleDelivery("1", envelope, props, payload);
envelope = new Envelope(2L, false, "foo", "bar");
consumer.get().handleDelivery("1", envelope, props, payload);
// waiting for batch receive timed out
Thread.sleep(20);
envelope = new Envelope(3L, false, "foo", "bar");
consumer.get().handleDelivery("1", envelope, props, payload);
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
verify(channel, never()).basicAck(eq(1), anyBoolean());
verify(channel).basicAck(2, true);
verify(channel, never()).basicAck(eq(2), anyBoolean());
verify(channel).basicAck(3, true);
container.stop();
verify(listener).containerAckMode(AcknowledgeMode.AUTO);
verify(listener).isAsyncReplies();
verifyNoMoreInteractions(listener);
}

private Answer<Object> messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container,
final boolean cancel, final CountDownLatch latch) {
return invocation -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ a|
|[[consumerBatchEnabled]]<<consumerBatchEnabled,`consumerBatchEnabled`>> +
(batch-enabled)

|If the `MessageListener` supports it, setting this to true enables batching of discrete messages, up to `batchSize`; a partial batch will be delivered if no new messages arrive in `receiveTimeout`.
|If the `MessageListener` supports it, setting this to true enables batching of discrete messages, up to `batchSize`; a partial batch will be delivered if no new messages arrive in `receiveTimeout` or gathering batch messages time exceeded `batchReceiveTimeout`.
When this is false, batching is only supported for batches created by a producer; see xref:amqp/sending-messages.adoc#template-batching[Batching].

a|image::tickmark.png[]
Expand Down Expand Up @@ -611,6 +611,18 @@ a|image::tickmark.png[]
a|
a|

|[[batchReceiveTimeout]]<<batchReceiveTimeout,`batchReceiveTimeout`>> +
(batch-receive-timeout)

|The number of milliseconds of timeout for gathering batch messages.
It limits the time to wait to fill batchSize.
When `batchSize > 1` and the time to gathering batch messages is greater than `batchReceiveTime`, batch will be delivered.
Default is 0 (no timeout).

a|image::tickmark.png[]
a|
a|

|[[recoveryBackOff]]<<recoveryBackOff,`recoveryBackOff`>> +
(recovery-back-off)

Expand Down