Skip to content

Commit b19ab35

Browse files
garyrussellartembilan
authored andcommitted
GH-804: DRTMLC - don't cancel in-use consumer
Fixes #804 The `DirectReplyToMessageListenerContainer` shuts down consumers that haven't been used for the `idleEventInterval`. However, the superclass simply cancels the first consumer in the list, which may actually being used in the DRTMLC. Change the logic to find a consumer that is not actually being used. **cherry-pick to 2.0.x** * Polishing - PR Comments
1 parent 66847b6 commit b19ab35

File tree

3 files changed

+44
-8
lines changed

3 files changed

+44
-8
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.concurrent.ScheduledFuture;
3434
import java.util.concurrent.TimeUnit;
3535
import java.util.stream.Collectors;
36-
import java.util.stream.IntStream;
3736
import java.util.stream.Stream;
3837

3938
import org.apache.commons.logging.Log;
@@ -311,14 +310,31 @@ private void adjustConsumers(int newCount) {
311310
}
312311
List<SimpleConsumer> consumerList = this.consumersByQueue.get(queue);
313312
if (consumerList != null && consumerList.size() > newCount) {
314-
IntStream.range(newCount, consumerList.size())
315-
.mapToObj(i -> consumerList.remove(0))
316-
.forEach(this::cancelConsumer);
313+
int delta = consumerList.size() - newCount;
314+
for (int i = 0; i < delta; i++) {
315+
int index = findIdleConsumer();
316+
if (index >= 0) {
317+
SimpleConsumer consumer = consumerList.remove(index);
318+
if (consumer != null) {
319+
cancelConsumer(consumer);
320+
}
321+
}
322+
}
317323
}
318324
}
319325
}
320326
}
321327

328+
/**
329+
* When adjusting down, return a consumer that can be canceled. Called while
330+
* synchronized on consumersMonitor.
331+
* @return the consumer index or -1 if non idle.
332+
* @since 2.0.6
333+
*/
334+
protected int findIdleConsumer() {
335+
return 0;
336+
}
337+
322338
private void checkStartState() {
323339
if (!this.isRunning()) {
324340
try {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectReplyToMessageListenerContainer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,16 @@ protected void processMonitorTask() {
147147
}
148148
}
149149

150+
@Override
151+
protected int findIdleConsumer() {
152+
for (int i = 0; i < this.consumers.size(); i++) {
153+
if (!this.inUseConsumerChannels.containsValue(this.consumers.get(i))) {
154+
return i;
155+
}
156+
}
157+
return -1;
158+
}
159+
150160
@Override
151161
protected void consumerRemoved(SimpleConsumer consumer) {
152162
this.inUseConsumerChannels.remove(consumer.getChannel());

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerIntegrationTests.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -551,12 +551,22 @@ public void testReplyToConsumersReduced() throws Exception {
551551
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
552552
DirectReplyToMessageListenerContainer container = new DirectReplyToMessageListenerContainer(cf);
553553
container.setBeanName("reducing");
554-
container.setIdleEventInterval(500);
554+
container.setIdleEventInterval(100);
555+
CountDownLatch latch = new CountDownLatch(5);
556+
container.setApplicationEventPublisher(e -> {
557+
if (e instanceof ListenerContainerIdleEvent) {
558+
latch.countDown();
559+
}
560+
});
555561
container.afterPropertiesSet();
556562
container.start();
557-
ChannelHolder channelHolder = container.getChannelHolder();
558-
assertTrue(activeConsumerCount(container, 1));
559-
container.releaseConsumerFor(channelHolder, false, null);
563+
ChannelHolder channelHolder1 = container.getChannelHolder();
564+
ChannelHolder channelHolder2 = container.getChannelHolder();
565+
assertTrue(activeConsumerCount(container, 2));
566+
container.releaseConsumerFor(channelHolder2, false, null);
567+
assertTrue(latch.await(10, TimeUnit.SECONDS));
568+
assertTrue(channelHolder1.getChannel().isOpen());
569+
container.releaseConsumerFor(channelHolder1, false, null);
560570
assertTrue(activeConsumerCount(container, 0));
561571
container.stop();
562572
cf.destroy();

0 commit comments

Comments
 (0)