Skip to content

Commit 7166859

Browse files
authored
spring-projectsGH-2601: Add a batchReceiveTimeout (spring-projects#2605)
Fixes: spring-projects#2601 Stop to waiting next message and execute listener when `batchReceiveTimeout` is timed out. * Add `batchReceiveTimeout` to the `SimpleMessageListenerContainer` configuration.
1 parent 7ad35b5 commit 7166859

File tree

5 files changed

+131
-6
lines changed

5 files changed

+131
-6
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -55,6 +55,7 @@
5555
* @author Gary Russell
5656
* @author Artem Bilan
5757
* @author Johno Crawford
58+
* @author Jeonggi Kim
5859
*
5960
* @since 2.0
6061
*
@@ -166,6 +167,8 @@ public class ListenerContainerFactoryBean extends AbstractFactoryBean<AbstractMe
166167

167168
private Long receiveTimeout;
168169

170+
private Long batchReceiveTimeout;
171+
169172
private Integer batchSize;
170173

171174
private Integer declarationRetries;
@@ -389,6 +392,18 @@ public void setReceiveTimeout(long receiveTimeout) {
389392
this.receiveTimeout = receiveTimeout;
390393
}
391394

395+
/**
396+
* The number of milliseconds of timeout for gathering batch messages.
397+
* It limits the time to wait to fill batchSize.
398+
* Default is 0 (no timeout).
399+
* @param batchReceiveTimeout the timeout for gathering batch messages.
400+
* @since 3.1.2
401+
* @see #setBatchSize(int)
402+
*/
403+
public void setBatchReceiveTimeout(long batchReceiveTimeout) {
404+
this.batchReceiveTimeout = batchReceiveTimeout;
405+
}
406+
392407
/**
393408
* This property has several functions.
394409
* <p>
@@ -552,6 +567,7 @@ private AbstractMessageListenerContainer createContainer() {
552567
.acceptIfNotNull(this.consecutiveActiveTrigger, container::setConsecutiveActiveTrigger)
553568
.acceptIfNotNull(this.consecutiveIdleTrigger, container::setConsecutiveIdleTrigger)
554569
.acceptIfNotNull(this.receiveTimeout, container::setReceiveTimeout)
570+
.acceptIfNotNull(this.batchReceiveTimeout, container::setBatchReceiveTimeout)
555571
.acceptIfNotNull(this.batchSize, container::setBatchSize)
556572
.acceptIfNotNull(this.consumerBatchEnabled, container::setConsumerBatchEnabled)
557573
.acceptIfNotNull(this.declarationRetries, container::setDeclarationRetries)

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java

+19-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2022 the original author or authors.
2+
* Copyright 2014-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,6 +32,7 @@
3232
* @author Gary Russell
3333
* @author Artem Bilan
3434
* @author Dustin Schultz
35+
* @author Jeonggi Kim
3536
*
3637
* @since 1.4
3738
*/
@@ -54,6 +55,8 @@ public class SimpleRabbitListenerContainerFactory
5455

5556
private Long receiveTimeout;
5657

58+
private Long batchReceiveTimeout;
59+
5760
private Boolean consumerBatchEnabled;
5861

5962
/**
@@ -121,6 +124,19 @@ public void setReceiveTimeout(Long receiveTimeout) {
121124
this.receiveTimeout = receiveTimeout;
122125
}
123126

127+
/**
128+
* The number of milliseconds of timeout for gathering batch messages.
129+
* It limits the time to wait to fill batchSize.
130+
* Default is 0 (no timeout).
131+
* @param batchReceiveTimeout the timeout for gathering batch messages.
132+
* @since 3.1.2
133+
* @see SimpleMessageListenerContainer#setBatchReceiveTimeout
134+
* @see #setBatchSize(Integer)
135+
*/
136+
public void setBatchReceiveTimeout(Long batchReceiveTimeout) {
137+
this.batchReceiveTimeout = batchReceiveTimeout;
138+
}
139+
124140
/**
125141
* Set to true to present a list of messages based on the {@link #setBatchSize(Integer)},
126142
* if the listener supports it. Starting with version 3.0, setting this to true will
@@ -163,7 +179,8 @@ protected void initializeContainer(SimpleMessageListenerContainer instance, Rabb
163179
.acceptIfNotNull(this.stopConsumerMinInterval, instance::setStopConsumerMinInterval)
164180
.acceptIfNotNull(this.consecutiveActiveTrigger, instance::setConsecutiveActiveTrigger)
165181
.acceptIfNotNull(this.consecutiveIdleTrigger, instance::setConsecutiveIdleTrigger)
166-
.acceptIfNotNull(this.receiveTimeout, instance::setReceiveTimeout);
182+
.acceptIfNotNull(this.receiveTimeout, instance::setReceiveTimeout)
183+
.acceptIfNotNull(this.batchReceiveTimeout, instance::setBatchReceiveTimeout);
167184
if (Boolean.TRUE.equals(this.consumerBatchEnabled)) {
168185
instance.setConsumerBatchEnabled(true);
169186
/*

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

+28-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -80,6 +80,7 @@
8080
* @author Mat Jaggard
8181
* @author Yansong Ren
8282
* @author Tim Bourquin
83+
* @author Jeonggi Kim
8384
*
8485
* @since 1.0
8586
*/
@@ -121,6 +122,8 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
121122

122123
private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
123124

125+
private long batchReceiveTimeout;
126+
124127
private Set<BlockingQueueConsumer> consumers;
125128

126129
private Integer declarationRetries;
@@ -330,6 +333,19 @@ public void setReceiveTimeout(long receiveTimeout) {
330333
this.receiveTimeout = receiveTimeout;
331334
}
332335

336+
/**
337+
* The number of milliseconds of timeout for gathering batch messages.
338+
* It limits the time to wait to fill batchSize.
339+
* Default is 0 (no timeout).
340+
* @param batchReceiveTimeout the timeout for gathering batch messages.
341+
* @since 3.1.2
342+
* @see #setBatchSize(int)
343+
*/
344+
public void setBatchReceiveTimeout(long batchReceiveTimeout) {
345+
Assert.isTrue(batchReceiveTimeout >= 0, "'batchReceiveTimeout' must be >= 0");
346+
this.batchReceiveTimeout = batchReceiveTimeout;
347+
}
348+
333349
/**
334350
* This property has several functions.
335351
* <p>
@@ -996,8 +1012,18 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep
9961012

9971013
List<Message> messages = null;
9981014
long deliveryTag = 0;
999-
1015+
boolean isBatchReceiveTimeoutEnabled = this.batchReceiveTimeout > 0;
1016+
long startTime = isBatchReceiveTimeoutEnabled ? System.currentTimeMillis() : 0;
10001017
for (int i = 0; i < this.batchSize; i++) {
1018+
boolean batchTimedOut = isBatchReceiveTimeoutEnabled &&
1019+
(System.currentTimeMillis() - startTime) > this.batchReceiveTimeout;
1020+
if (batchTimedOut) {
1021+
if (logger.isTraceEnabled()) {
1022+
long gathered = messages != null ? messages.size() : 0;
1023+
logger.trace("Timed out for gathering batch messages. gathered size is " + gathered);
1024+
}
1025+
break;
1026+
}
10011027

10021028
logger.trace("Waiting for message from consumer.");
10031029
Message message = consumer.nextMessage(this.receiveTimeout);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java

+54
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@
110110
* @author Mohammad Hewedy
111111
* @author Yansong Ren
112112
* @author Tim Bourquin
113+
* @author Jeonggi Kim
113114
*/
114115
public class SimpleMessageListenerContainerTests {
115116

@@ -784,6 +785,59 @@ void testWithConsumerStartWhenNotActive() {
784785
assertThat(start.getCount()).isEqualTo(0L);
785786
}
786787

788+
@Test
789+
public void testBatchReceiveTimedOut() throws Exception {
790+
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
791+
Connection connection = mock(Connection.class);
792+
Channel channel = mock(Channel.class);
793+
given(connectionFactory.createConnection()).willReturn(connection);
794+
given(connection.createChannel(false)).willReturn(channel);
795+
final AtomicReference<Consumer> consumer = new AtomicReference<>();
796+
willAnswer(invocation -> {
797+
consumer.set(invocation.getArgument(6));
798+
consumer.get().handleConsumeOk("1");
799+
return "1";
800+
}).given(channel)
801+
.basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyMap(),
802+
any(Consumer.class));
803+
final CountDownLatch latch = new CountDownLatch(2);
804+
willAnswer(invocation -> {
805+
latch.countDown();
806+
return null;
807+
}).given(channel).basicAck(anyLong(), anyBoolean());
808+
809+
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
810+
container.setAfterReceivePostProcessors(msg -> null);
811+
container.setQueueNames("foo");
812+
MessageListener listener = mock(BatchMessageListener.class);
813+
container.setMessageListener(listener);
814+
container.setBatchSize(3);
815+
container.setConsumerBatchEnabled(true);
816+
container.setReceiveTimeout(10);
817+
container.setBatchReceiveTimeout(20);
818+
container.start();
819+
820+
BasicProperties props = new BasicProperties();
821+
byte[] payload = "baz".getBytes();
822+
Envelope envelope = new Envelope(1L, false, "foo", "bar");
823+
consumer.get().handleDelivery("1", envelope, props, payload);
824+
envelope = new Envelope(2L, false, "foo", "bar");
825+
consumer.get().handleDelivery("1", envelope, props, payload);
826+
// waiting for batch receive timed out
827+
Thread.sleep(20);
828+
envelope = new Envelope(3L, false, "foo", "bar");
829+
consumer.get().handleDelivery("1", envelope, props, payload);
830+
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
831+
verify(channel, never()).basicAck(eq(1), anyBoolean());
832+
verify(channel).basicAck(2, true);
833+
verify(channel, never()).basicAck(eq(2), anyBoolean());
834+
verify(channel).basicAck(3, true);
835+
container.stop();
836+
verify(listener).containerAckMode(AcknowledgeMode.AUTO);
837+
verify(listener).isAsyncReplies();
838+
verifyNoMoreInteractions(listener);
839+
}
840+
787841
private Answer<Object> messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container,
788842
final boolean cancel, final CountDownLatch latch) {
789843
return invocation -> {

src/reference/antora/modules/ROOT/pages/amqp/containerAttributes.adoc

+13-1
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ a|
198198
|[[consumerBatchEnabled]]<<consumerBatchEnabled,`consumerBatchEnabled`>> +
199199
(batch-enabled)
200200

201-
|If the `MessageListener` supports it, setting this to true enables batching of discrete messages, up to `batchSize`; a partial batch will be delivered if no new messages arrive in `receiveTimeout`.
201+
|If the `MessageListener` supports it, setting this to true enables batching of discrete messages, up to `batchSize`; a partial batch will be delivered if no new messages arrive in `receiveTimeout` or gathering batch messages time exceeded `batchReceiveTimeout`.
202202
When this is false, batching is only supported for batches created by a producer; see xref:amqp/sending-messages.adoc#template-batching[Batching].
203203

204204
a|image::tickmark.png[]
@@ -611,6 +611,18 @@ a|image::tickmark.png[]
611611
a|
612612
a|
613613

614+
|[[batchReceiveTimeout]]<<batchReceiveTimeout,`batchReceiveTimeout`>> +
615+
(batch-receive-timeout)
616+
617+
|The number of milliseconds of timeout for gathering batch messages.
618+
It limits the time to wait to fill batchSize.
619+
When `batchSize > 1` and the time to gathering batch messages is greater than `batchReceiveTime`, batch will be delivered.
620+
Default is 0 (no timeout).
621+
622+
a|image::tickmark.png[]
623+
a|
624+
a|
625+
614626
|[[recoveryBackOff]]<<recoveryBackOff,`recoveryBackOff`>> +
615627
(recovery-back-off)
616628

0 commit comments

Comments
 (0)