Skip to content

Commit 42c83b1

Browse files
committed
PR review
1 parent 433e3c6 commit 42c83b1

File tree

3 files changed

+29
-10
lines changed

3 files changed

+29
-10
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java

+8
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,14 @@ public void setReceiveTimeout(long receiveTimeout) {
392392
this.receiveTimeout = receiveTimeout;
393393
}
394394

395+
/**
396+
* The number of milliseconds of timeout for gathering batch messages.
397+
* It limits the time to wait to fill batchSize.
398+
* Default is 0 (no timeout).
399+
* @param batchReceiveTimeout the timeout for gathering batch messages.
400+
* @since 3.1.2
401+
* @see #setBatchSize(int)
402+
*/
395403
public void setBatchReceiveTimeout(long batchReceiveTimeout) {
396404
this.batchReceiveTimeout = batchReceiveTimeout;
397405
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,13 @@ public void setReceiveTimeout(Long receiveTimeout) {
125125
}
126126

127127
/**
128-
* @param batchReceiveTimeout the timeout for gather batch messages.
128+
* The number of milliseconds of timeout for gathering batch messages.
129+
* It limits the time to wait to fill batchSize.
130+
* Default is 0 (no timeout).
131+
* @param batchReceiveTimeout the timeout for gathering batch messages.
132+
* @since 3.1.2
129133
* @see SimpleMessageListenerContainer#setBatchReceiveTimeout
134+
* @see #setBatchSize(Integer)
130135
*/
131136
public void setBatchReceiveTimeout(Long batchReceiveTimeout) {
132137
this.batchReceiveTimeout = batchReceiveTimeout;

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

+15-9
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,6 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
9898

9999
private static final int DEFAULT_CONSECUTIVE_IDLE_TRIGGER = 10;
100100

101-
private static final long DEFAULT_BATCH_RECEIVE_TIMEOUT = 3000;
102-
103101
public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
104102

105103
private final AtomicLong lastNoMessageAlert = new AtomicLong();
@@ -124,7 +122,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
124122

125123
private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
126124

127-
private long batchReceiveTimeout = DEFAULT_BATCH_RECEIVE_TIMEOUT;
125+
private long batchReceiveTimeout;
128126

129127
private Set<BlockingQueueConsumer> consumers;
130128

@@ -336,12 +334,15 @@ public void setReceiveTimeout(long receiveTimeout) {
336334
}
337335

338336
/**
339-
* The time (in milliseconds) that a waiting time to fill batch size. Default
340-
* 3000 (3 second).
341-
* @param batchReceiveTimeout the timeout
337+
* The number of milliseconds of timeout for gathering batch messages.
338+
* It limits the time to wait to fill batchSize.
339+
* Default is 0 (no timeout).
340+
* @param batchReceiveTimeout the timeout for gathering batch messages.
341+
* @since 3.1.2
342+
* @see #setBatchSize(int)
342343
*/
343344
public void setBatchReceiveTimeout(long batchReceiveTimeout) {
344-
Assert.isTrue(batchReceiveTimeout > 0, "'batchReceiveTimeout' must be > 0");
345+
Assert.isTrue(batchReceiveTimeout >= 0, "'batchReceiveTimeout' must be >= 0");
345346
this.batchReceiveTimeout = batchReceiveTimeout;
346347
}
347348

@@ -1014,9 +1015,14 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep
10141015
long startTime = System.currentTimeMillis();
10151016

10161017
for (int i = 0; i < this.batchSize; i++) {
1017-
boolean batchTimedOut = (System.currentTimeMillis() - startTime) > this.batchReceiveTimeout;
1018+
boolean batchTimedOut = this.batchReceiveTimeout > 0 &&
1019+
(System.currentTimeMillis() - startTime) > this.batchReceiveTimeout;
10181020
if (batchTimedOut) {
1019-
logger.trace("Timed out for gather batch messages.");
1021+
long gathered = 0;
1022+
if (messages != null) {
1023+
gathered = messages.size();
1024+
}
1025+
logger.trace("Timed out for gathering batch messages. gathered size is " + gathered);
10201026
break;
10211027
}
10221028

0 commit comments

Comments
 (0)