Skip to content

Commit 4c5399d

Browse files
garyrussellartembilan
authored andcommitted
GH-637: Fix spurious nonresponsive consumer events
Fixes #637 Wrong timestamp used for event publication causing invalid events. # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java * Fixed `KafkaMessageListenerContainerTests` for Java 7 and appropriate Mockito version
1 parent 6943d5f commit 4c5399d

File tree

2 files changed

+50
-2
lines changed

2 files changed

+50
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
359359

360360
private boolean taskSchedulerExplicitlySet;
361361

362+
private volatile long lastPoll = System.currentTimeMillis();
363+
362364
@SuppressWarnings("unchecked")
363365
ListenerConsumer(GenericMessageListener<?> listener, GenericAcknowledgingMessageListener<?> ackListener) {
364366
Assert.state(!this.isAnyManualAck || !this.autoCommit,
@@ -461,7 +463,7 @@ public void run() {
461463
}
462464

463465
protected void checkConsumer() {
464-
long timeSinceLastPoll = System.currentTimeMillis() - last;
466+
long timeSinceLastPoll = System.currentTimeMillis() - this.lastPoll;
465467
if (((float) timeSinceLastPoll) / (float) this.containerProperties.getPollTimeout()
466468
> this.containerProperties.getNoPollThreshold()) {
467469
publishNonResponsiveConsumerEvent(timeSinceLastPoll, this.consumer);
@@ -624,6 +626,8 @@ public void run() {
624626
}
625627
processSeeks();
626628
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
629+
this.lastPoll = System.currentTimeMillis();
630+
627631
if (records != null && this.logger.isDebugEnabled()) {
628632
this.logger.debug("Received: " + records.count() + " records");
629633
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

+45-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -430,6 +430,50 @@ public void publishEvent(ApplicationEvent event) {
430430
container.stop();
431431
}
432432

433+
@SuppressWarnings({ "unchecked", "rawtypes" })
434+
@Test
435+
public void testNonResponsiveConsumerEventNotIssuedWithActiveConsumer() throws Exception {
436+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
437+
Consumer<Integer, String> consumer = mock(Consumer.class);
438+
given(cf.createConsumer(anyString(), eq(""))).willReturn(consumer);
439+
ConsumerRecords records = new ConsumerRecords(Collections.emptyMap());
440+
CountDownLatch latch = new CountDownLatch(20);
441+
given(consumer.poll(anyLong())).willAnswer(i -> {
442+
Thread.sleep(100);
443+
latch.countDown();
444+
return records;
445+
});
446+
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
447+
new TopicPartitionInitialOffset("foo", 0) };
448+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
449+
containerProps.setNoPollThreshold(2.0f);
450+
containerProps.setPollTimeout(100);
451+
containerProps.setMonitorInterval(1);
452+
containerProps.setMessageListener(mock(MessageListener.class));
453+
KafkaMessageListenerContainer<Integer, String> container =
454+
new KafkaMessageListenerContainer<>(cf, containerProps);
455+
final AtomicInteger eventCounter = new AtomicInteger();
456+
container.setApplicationEventPublisher(new ApplicationEventPublisher() {
457+
458+
@Override
459+
public void publishEvent(Object e) {
460+
if (e instanceof NonResponsiveConsumerEvent) {
461+
eventCounter.incrementAndGet();
462+
}
463+
}
464+
465+
@Override
466+
public void publishEvent(ApplicationEvent event) {
467+
publishEvent((Object) event);
468+
}
469+
470+
});
471+
container.start();
472+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
473+
container.stop();
474+
assertThat(eventCounter.get()).isEqualTo(0);
475+
}
476+
433477
@Test
434478
public void testBatchAck() throws Exception {
435479
logger.info("Start batch ack");

0 commit comments

Comments
 (0)