Skip to content

Commit 32608ba

Browse files
committed
spring-projectsGH-2601: Add a batchReceiveTimeout
Fixes spring-projects#2601 stop to waiting next message and execute listener when batchReceiveTimeout is timed out.
1 parent cb15cf5 commit 32608ba

File tree

4 files changed

+99
-4
lines changed

4 files changed

+99
-4
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -55,6 +55,7 @@
5555
* @author Gary Russell
5656
* @author Artem Bilan
5757
* @author Johno Crawford
58+
* @author Jeonggi Kim
5859
*
5960
* @since 2.0
6061
*
@@ -166,6 +167,8 @@ public class ListenerContainerFactoryBean extends AbstractFactoryBean<AbstractMe
166167

167168
private Long receiveTimeout;
168169

170+
private Long batchReceiveTimeout;
171+
169172
private Integer batchSize;
170173

171174
private Integer declarationRetries;
@@ -389,6 +392,10 @@ public void setReceiveTimeout(long receiveTimeout) {
389392
this.receiveTimeout = receiveTimeout;
390393
}
391394

395+
public void setBatchReceiveTimeout(long batchReceiveTimeout) {
396+
this.batchReceiveTimeout = batchReceiveTimeout;
397+
}
398+
392399
/**
393400
* This property has several functions.
394401
* <p>
@@ -552,6 +559,7 @@ private AbstractMessageListenerContainer createContainer() {
552559
.acceptIfNotNull(this.consecutiveActiveTrigger, container::setConsecutiveActiveTrigger)
553560
.acceptIfNotNull(this.consecutiveIdleTrigger, container::setConsecutiveIdleTrigger)
554561
.acceptIfNotNull(this.receiveTimeout, container::setReceiveTimeout)
562+
.acceptIfNotNull(this.batchReceiveTimeout, container::setBatchReceiveTimeout)
555563
.acceptIfNotNull(this.batchSize, container::setBatchSize)
556564
.acceptIfNotNull(this.consumerBatchEnabled, container::setConsumerBatchEnabled)
557565
.acceptIfNotNull(this.declarationRetries, container::setDeclarationRetries)

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2022 the original author or authors.
2+
* Copyright 2014-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,6 +32,7 @@
3232
* @author Gary Russell
3333
* @author Artem Bilan
3434
* @author Dustin Schultz
35+
* @author Jeonggi Kim
3536
*
3637
* @since 1.4
3738
*/
@@ -54,6 +55,8 @@ public class SimpleRabbitListenerContainerFactory
5455

5556
private Long receiveTimeout;
5657

58+
private Long batchReceiveTimeout;
59+
5760
private Boolean consumerBatchEnabled;
5861

5962
/**
@@ -121,6 +124,14 @@ public void setReceiveTimeout(Long receiveTimeout) {
121124
this.receiveTimeout = receiveTimeout;
122125
}
123126

127+
/**
128+
* @param batchReceiveTimeout the timeout for gather batch messages.
129+
* @see SimpleMessageListenerContainer#setBatchReceiveTimeout
130+
*/
131+
public void setBatchReceiveTimeout(Long batchReceiveTimeout) {
132+
this.batchReceiveTimeout = batchReceiveTimeout;
133+
}
134+
124135
/**
125136
* Set to true to present a list of messages based on the {@link #setBatchSize(Integer)},
126137
* if the listener supports it. Starting with version 3.0, setting this to true will
@@ -163,7 +174,8 @@ protected void initializeContainer(SimpleMessageListenerContainer instance, Rabb
163174
.acceptIfNotNull(this.stopConsumerMinInterval, instance::setStopConsumerMinInterval)
164175
.acceptIfNotNull(this.consecutiveActiveTrigger, instance::setConsecutiveActiveTrigger)
165176
.acceptIfNotNull(this.consecutiveIdleTrigger, instance::setConsecutiveIdleTrigger)
166-
.acceptIfNotNull(this.receiveTimeout, instance::setReceiveTimeout);
177+
.acceptIfNotNull(this.receiveTimeout, instance::setReceiveTimeout)
178+
.acceptIfNotNull(this.batchReceiveTimeout, instance::setBatchReceiveTimeout);
167179
if (Boolean.TRUE.equals(this.consumerBatchEnabled)) {
168180
instance.setConsumerBatchEnabled(true);
169181
/*

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -80,6 +80,7 @@
8080
* @author Mat Jaggard
8181
* @author Yansong Ren
8282
* @author Tim Bourquin
83+
* @author Jeonggi Kim
8384
*
8485
* @since 1.0
8586
*/
@@ -97,6 +98,8 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
9798

9899
private static final int DEFAULT_CONSECUTIVE_IDLE_TRIGGER = 10;
99100

101+
private static final long DEFAULT_BATCH_RECEIVE_TIMEOUT = 3000;
102+
100103
public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
101104

102105
private final AtomicLong lastNoMessageAlert = new AtomicLong();
@@ -121,6 +124,8 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
121124

122125
private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
123126

127+
private long batchReceiveTimeout = DEFAULT_BATCH_RECEIVE_TIMEOUT;
128+
124129
private Set<BlockingQueueConsumer> consumers;
125130

126131
private Integer declarationRetries;
@@ -330,6 +335,16 @@ public void setReceiveTimeout(long receiveTimeout) {
330335
this.receiveTimeout = receiveTimeout;
331336
}
332337

338+
/**
339+
* The time (in milliseconds) that a waiting time to fill batch size. Default
340+
* 3000 (3 second).
341+
* @param batchReceiveTimeout the timeout
342+
*/
343+
public void setBatchReceiveTimeout(long batchReceiveTimeout) {
344+
Assert.isTrue(batchReceiveTimeout > 0, "'batchReceiveTimeout' must be > 0");
345+
this.batchReceiveTimeout = batchReceiveTimeout;
346+
}
347+
333348
/**
334349
* This property has several functions.
335350
* <p>
@@ -996,8 +1011,14 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep
9961011

9971012
List<Message> messages = null;
9981013
long deliveryTag = 0;
1014+
long startTime = System.currentTimeMillis();
9991015

10001016
for (int i = 0; i < this.batchSize; i++) {
1017+
boolean batchTimedOut = (System.currentTimeMillis() - startTime) > this.batchReceiveTimeout;
1018+
if (batchTimedOut) {
1019+
logger.trace("Timed out for gather batch messages.");
1020+
break;
1021+
}
10011022

10021023
logger.trace("Waiting for message from consumer.");
10031024
Message message = consumer.nextMessage(this.receiveTimeout);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java

+54
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@
110110
* @author Mohammad Hewedy
111111
* @author Yansong Ren
112112
* @author Tim Bourquin
113+
* @author Jeonggi Kim
113114
*/
114115
public class SimpleMessageListenerContainerTests {
115116

@@ -784,6 +785,59 @@ void testWithConsumerStartWhenNotActive() {
784785
assertThat(start.getCount()).isEqualTo(0L);
785786
}
786787

788+
@Test
789+
public void testBatchReceiveTimedOut() throws Exception {
790+
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
791+
Connection connection = mock(Connection.class);
792+
Channel channel = mock(Channel.class);
793+
given(connectionFactory.createConnection()).willReturn(connection);
794+
given(connection.createChannel(false)).willReturn(channel);
795+
final AtomicReference<Consumer> consumer = new AtomicReference<>();
796+
willAnswer(invocation -> {
797+
consumer.set(invocation.getArgument(6));
798+
consumer.get().handleConsumeOk("1");
799+
return "1";
800+
}).given(channel)
801+
.basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyMap(),
802+
any(Consumer.class));
803+
final CountDownLatch latch = new CountDownLatch(2);
804+
willAnswer(invocation -> {
805+
latch.countDown();
806+
return null;
807+
}).given(channel).basicAck(anyLong(), anyBoolean());
808+
809+
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
810+
container.setAfterReceivePostProcessors(msg -> null);
811+
container.setQueueNames("foo");
812+
MessageListener listener = mock(BatchMessageListener.class);
813+
container.setMessageListener(listener);
814+
container.setBatchSize(3);
815+
container.setConsumerBatchEnabled(true);
816+
container.setReceiveTimeout(10);
817+
container.setBatchReceiveTimeout(20);
818+
container.start();
819+
820+
BasicProperties props = new BasicProperties();
821+
byte[] payload = "baz".getBytes();
822+
Envelope envelope = new Envelope(1L, false, "foo", "bar");
823+
consumer.get().handleDelivery("1", envelope, props, payload);
824+
envelope = new Envelope(2L, false, "foo", "bar");
825+
consumer.get().handleDelivery("1", envelope, props, payload);
826+
// waiting for batch receive timed out
827+
Thread.sleep(20);
828+
envelope = new Envelope(3L, false, "foo", "bar");
829+
consumer.get().handleDelivery("1", envelope, props, payload);
830+
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
831+
verify(channel, never()).basicAck(eq(1), anyBoolean());
832+
verify(channel).basicAck(2, true);
833+
verify(channel, never()).basicAck(eq(2), anyBoolean());
834+
verify(channel).basicAck(3, true);
835+
container.stop();
836+
verify(listener).containerAckMode(AcknowledgeMode.AUTO);
837+
verify(listener).isAsyncReplies();
838+
verifyNoMoreInteractions(listener);
839+
}
840+
787841
private Answer<Object> messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container,
788842
final boolean cancel, final CountDownLatch latch) {
789843
return invocation -> {

0 commit comments

Comments
 (0)