Skip to content

Commit 9e95303

Browse files
committed
spring-projectsGH-637: Fix spurious nonresponsive consumer events
Fixes spring-projects#637 Wrong timestamp used for event publication causing invalid events.
1 parent 3706405 commit 9e95303

File tree

2 files changed

+38
-1
lines changed

2 files changed

+38
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
415415

416416
private boolean consumerPaused;
417417

418+
private volatile long lastPoll = System.currentTimeMillis();
419+
418420
@SuppressWarnings("unchecked")
419421
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
420422
Assert.state(!this.isAnyManualAck || !this.autoCommit,
@@ -502,7 +504,7 @@ else if (listener instanceof MessageListener) {
502504
}
503505

504506
protected void checkConsumer() {
505-
long timeSinceLastPoll = System.currentTimeMillis() - last;
507+
long timeSinceLastPoll = System.currentTimeMillis() - this.lastPoll;
506508
if (((float) timeSinceLastPoll) / (float) this.containerProperties.getPollTimeout()
507509
> this.containerProperties.getNoPollThreshold()) {
508510
publishNonResponsiveConsumerEvent(timeSinceLastPoll, this.consumer);
@@ -695,6 +697,7 @@ public void run() {
695697
publishConsumerPausedEvent(this.consumer.assignment());
696698
}
697699
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
700+
this.lastPoll = System.currentTimeMillis();
698701
if (this.consumerPaused && !isPaused()) {
699702
if (this.logger.isDebugEnabled()) {
700703
this.logger.debug("Resuming consumption from: " + this.consumer.paused());

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

+34
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,40 @@ public void testNonResponsiveConsumerEvent() throws Exception {
664664
container.stop();
665665
}
666666

667+
@SuppressWarnings({ "unchecked", "rawtypes" })
668+
@Test
669+
public void testNonResponsiveConsumerEventNotIssuedWithActiveConsumer() throws Exception {
670+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
671+
Consumer<Integer, String> consumer = mock(Consumer.class);
672+
given(cf.createConsumer(isNull(), eq(""), isNull())).willReturn(consumer);
673+
ConsumerRecords records = new ConsumerRecords(Collections.emptyMap());
674+
CountDownLatch latch = new CountDownLatch(20);
675+
given(consumer.poll(anyLong())).willAnswer(i -> {
676+
Thread.sleep(100);
677+
latch.countDown();
678+
return records;
679+
});
680+
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
681+
new TopicPartitionInitialOffset("foo", 0) };
682+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
683+
containerProps.setNoPollThreshold(2.0f);
684+
containerProps.setPollTimeout(100);
685+
containerProps.setMonitorInterval(1);
686+
containerProps.setMessageListener(mock(MessageListener.class));
687+
KafkaMessageListenerContainer<Integer, String> container =
688+
new KafkaMessageListenerContainer<>(cf, containerProps);
689+
final AtomicInteger eventCounter = new AtomicInteger();
690+
container.setApplicationEventPublisher(e -> {
691+
if (e instanceof NonResponsiveConsumerEvent) {
692+
eventCounter.incrementAndGet();
693+
}
694+
});
695+
container.start();
696+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
697+
container.stop();
698+
assertThat(eventCounter.get()).isEqualTo(0);
699+
}
700+
667701
@Test
668702
public void testBatchAck() throws Exception {
669703
logger.info("Start batch ack");

0 commit comments

Comments
 (0)