Skip to content

Losing messages in RetryingBatchErrorHandler/ErrorHandlingUtils during rebalancing #2340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
vooft opened this issue Jul 9, 2022 · 5 comments · Fixed by #2341
Closed

Losing messages in RetryingBatchErrorHandler/ErrorHandlingUtils during rebalancing #2340

vooft opened this issue Jul 9, 2022 · 5 comments · Fixed by #2341

Comments

@vooft
Copy link
Contributor

vooft commented Jul 9, 2022

In what version(s) of Spring for Apache Kafka are you seeing this issue?

For example:

2.3.7+ (since introduction of RetryingBatchErrorHandler)

Describe the bug

If a listener doing a retry and rebalancing kicks in, then the consumer will be unpaused and every keep-alive consumer.poll() invocation in the error handler will start returning records and the error handler will throw them away.

consumer.pause(consumer.assignment());
try {
    while (nextBackOff != BackOffExecution.STOP) {
        consumer.poll(Duration.ZERO); // after rebalancing this poll will start returning records
        try {
...

It seems that the rebalancing listener KafkaMessageListenerContainer can re-pause the consumer again, but it is not aware that the error handler paused it.

To Reproduce

Create a listener with infinite retry RetryingBatchErrorHandler that will be constantly failing and force rebalancing. After rebalancing finished RetryingBatchErrorHandler will drain all the messages from assigned partitions without any processing.

Expected behavior

RetryingBatchErrorHandler should re-pause consumer after rebalancing.

Sample

Test is using RetryingBatchErrorHandler, but logic in ErrorHandlingUtils is essentially the same
https://github.com/vooft/kafka-retry-issue/blob/master/src/test/java/com/example/kafkaissue/KafkaIssueTest.java

@garyrussell
Copy link
Contributor

Thanks for reporting; this is indeed a bug.

Please note that the RecoveringBatchErrorHandler is the preferred mechanism (pre 2.8) and its functionality is the default in the DefaultErrorHander since 2.8. With that mechanism, the listener throws a BatchListenerFailedException to indicate which record in the batch failed.

@vooft
Copy link
Contributor Author

vooft commented Jul 11, 2022

Thank you for your answer @garyrussell
I'm looking at DefaultErrorHander and it seems that if you just throw an arbitrary exception from the listener it will still delegate the handling to RetryingBatchErrorHandler, not RecoveringBatchErrorHandler, or am I missing something?

In spring-kafka 2.8.x:

Default fallback is FallbackBatchErrorHandler
https://github.com/spring-projects/spring-kafka/blob/2.8.x/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java#L93

And FallbackBatchErrorHandler extends RetryingBatchErrorHandler
https://github.com/spring-projects/spring-kafka/blob/2.8.x/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java#L32

@garyrussell
Copy link
Contributor

Yes; I agree; I was just pointing out that the recovering mechanism is preferred to the (older) retrying mechanism that suffers from this bug. I will have a fix soon.

@vooft
Copy link
Contributor Author

vooft commented Jul 11, 2022

Gotcha, thank you!

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jul 11, 2022
Resolves spring-projects#2340

The `RetryingBatchErrorHandler` - now called the `FallbackBatchErrorHandler`
pauses and resumes the consumer during retries, to allow it to poll the
consumer to avoid a forced rebalance.

However, if a normal rebalance occurs, for example if a new member joins,
the error handler does not re-pause the consumer and silently consumes
new records.

Add a mechanism to always re-pause the consume when in this retry mode.

**cherry-pick to 2.9.x, 2.8.x**
artembilan pushed a commit that referenced this issue Jul 11, 2022
Resolves #2340

The `RetryingBatchErrorHandler` - now called the `FallbackBatchErrorHandler`
pauses and resumes the consumer during retries, to allow it to poll the
consumer to avoid a forced rebalance.

However, if a normal rebalance occurs, for example if a new member joins,
the error handler does not re-pause the consumer and silently consumes
new records.

Add a mechanism to always re-pause the consume when in this retry mode.

**cherry-pick to 2.9.x, 2.8.x**
artembilan pushed a commit that referenced this issue Jul 11, 2022
Resolves #2340

The `RetryingBatchErrorHandler` - now called the `FallbackBatchErrorHandler`
pauses and resumes the consumer during retries, to allow it to poll the
consumer to avoid a forced rebalance.

However, if a normal rebalance occurs, for example if a new member joins,
the error handler does not re-pause the consumer and silently consumes
new records.

Add a mechanism to always re-pause the consume when in this retry mode.

**cherry-pick to 2.9.x, 2.8.x**
garyrussell added a commit that referenced this issue Jul 11, 2022
Resolves #2340

The `RetryingBatchErrorHandler` - now called the `FallbackBatchErrorHandler`
pauses and resumes the consumer during retries, to allow it to poll the
consumer to avoid a forced rebalance.

However, if a normal rebalance occurs, for example if a new member joins,
the error handler does not re-pause the consumer and silently consumes
new records.

Add a mechanism to always re-pause the consume when in this retry mode.

**cherry-pick to 2.9.x, 2.8.x**
@garyrussell
Copy link
Contributor

@vooft 2.8.8 with the fix is now in Maven Central.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants