Skip to content

Spring Kafka - Consumers Detecting Broker Unavailable #637

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
yihfenglow opened this issue Apr 3, 2018 · 4 comments
Closed

Spring Kafka - Consumers Detecting Broker Unavailable #637

yihfenglow opened this issue Apr 3, 2018 · 4 comments
Assignees
Milestone

Comments

@yihfenglow
Copy link

I understand from https://stackoverflow.com/questions/47524315/handle-failure-in-case-the-kafka-broker-is-down that since 1.3.2, there is a "NonResponsiveConsumerEvent" that triggers when the broker is unavailable.

When trying this, I note that the event is triggered regardless of whether the consumer polling is responsive or not.

Digging deeper into the code, I note that the "last" variable in KafkaMessageListenerContainer.java isn't being updated in the processCommits() method if ackMode = manual, batch, record or count. Because the "last" variable isn't being updated, it looks like the checkConsumer() method in KafkaMessageListenerContainer.java will always trigger the NonResponsiveConsumerEvent.

Would you please be able to take a look to see if this is a bug?

Thanks!

@yihfenglow
Copy link
Author

After a little more investigation, it looks like the issue is that the "last" variable is meant to capture the last read/commit.

What is more apt is a new lastSuccessfulPoll variable that should be updated to System.currentTimeMillis() after each call to this.consumer.poll() in KafkaMessageListenerContainer.

@garyrussell
Copy link
Contributor

Yes; this is a bug; thanks; we can't update last for each poll because it's used for time-based commits, but it's definitely a bug.

@garyrussell garyrussell added this to the 2.1.5 milestone Apr 3, 2018
@garyrussell garyrussell self-assigned this Apr 3, 2018
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Apr 3, 2018
Fixes spring-projects#637

Wrong timestamp used for event publication causing invalid events.
artembilan pushed a commit that referenced this issue Apr 3, 2018
Fixes #637

Wrong timestamp used for event publication causing invalid events.
artembilan pushed a commit that referenced this issue Apr 3, 2018
Fixes #637

Wrong timestamp used for event publication causing invalid events.

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
artembilan pushed a commit that referenced this issue Apr 3, 2018
Fixes #637

Wrong timestamp used for event publication causing invalid events.

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

* Fixed `KafkaMessageListenerContainerTests` for Java 7 and
appropriate Mockito version
denis554 added a commit to denis554/spring-kafka that referenced this issue Mar 27, 2019
Fixes spring-projects/spring-kafka#637

Wrong timestamp used for event publication causing invalid events.
@AleksandarTokarev
Copy link

@garyrussell NonResponsiveConsumerEvent does not seem to work for me in Spring Boot 2.7.8 and Spring Kafka 2.8.1. Anything I am missing? My idea is to survive Kafka Broker outage - by implementing a custom health check which would return Status.DOWN

@garyrussell
Copy link
Contributor

I no longer work on this project.

That event is obsolete (this issue is nearly 6 years old); the event was used with a much older kafka-clients library, which needed a different threading model, and the consumer thread could be stuck in poll(). This is no longer the case.

Unfortunately, the consumer API provides no mechanism to check broker liveness. You could use something like AdminClient.describeCluster() to check the status.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants