Skip to content

Commit b688311

Browse files
garyrussellartembilan
authored andcommitted
GH-637: Fix spurious nonresponsive consumer events
Fixes #637 Wrong timestamp used for event publication causing invalid events. # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
1 parent 15cb937 commit b688311

File tree

2 files changed

+39
-1
lines changed

2 files changed

+39
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
357357

358358
private boolean taskSchedulerExplicitlySet;
359359

360+
private volatile long lastPoll = System.currentTimeMillis();
361+
360362
@SuppressWarnings("unchecked")
361363
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
362364
Assert.state(!this.isAnyManualAck || !this.autoCommit,
@@ -437,7 +439,7 @@ else if (listener instanceof MessageListener) {
437439
}
438440

439441
protected void checkConsumer() {
440-
long timeSinceLastPoll = System.currentTimeMillis() - last;
442+
long timeSinceLastPoll = System.currentTimeMillis() - this.lastPoll;
441443
if (((float) timeSinceLastPoll) / (float) this.containerProperties.getPollTimeout()
442444
> this.containerProperties.getNoPollThreshold()) {
443445
publishNonResponsiveConsumerEvent(timeSinceLastPoll, this.consumer);
@@ -619,6 +621,8 @@ public void run() {
619621
}
620622
processSeeks();
621623
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
624+
this.lastPoll = System.currentTimeMillis();
625+
622626
if (records != null && this.logger.isDebugEnabled()) {
623627
this.logger.debug("Received: " + records.count() + " records");
624628
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

+34
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,40 @@ public void testBrokerDownEvent() throws Exception {
647647
container.stop();
648648
}
649649

650+
@SuppressWarnings({ "unchecked", "rawtypes" })
651+
@Test
652+
public void testNonResponsiveConsumerEventNotIssuedWithActiveConsumer() throws Exception {
653+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
654+
Consumer<Integer, String> consumer = mock(Consumer.class);
655+
given(cf.createConsumer(isNull(), eq(""))).willReturn(consumer);
656+
ConsumerRecords records = new ConsumerRecords(Collections.emptyMap());
657+
CountDownLatch latch = new CountDownLatch(20);
658+
given(consumer.poll(anyLong())).willAnswer(i -> {
659+
Thread.sleep(100);
660+
latch.countDown();
661+
return records;
662+
});
663+
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
664+
new TopicPartitionInitialOffset("foo", 0) };
665+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
666+
containerProps.setNoPollThreshold(2.0f);
667+
containerProps.setPollTimeout(100);
668+
containerProps.setMonitorInterval(1);
669+
containerProps.setMessageListener(mock(MessageListener.class));
670+
KafkaMessageListenerContainer<Integer, String> container =
671+
new KafkaMessageListenerContainer<>(cf, containerProps);
672+
final AtomicInteger eventCounter = new AtomicInteger();
673+
container.setApplicationEventPublisher(e -> {
674+
if (e instanceof NonResponsiveConsumerEvent) {
675+
eventCounter.incrementAndGet();
676+
}
677+
});
678+
container.start();
679+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
680+
container.stop();
681+
assertThat(eventCounter.get()).isEqualTo(0);
682+
}
683+
650684
@Test
651685
public void testBatchAck() throws Exception {
652686
logger.info("Start batch ack");

0 commit comments

Comments
 (0)