Skip to content

Remove sharing of the retrying field in FallbackBatchErrorHandler #2383

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 29, 2022
Merged

Conversation

vooft
Copy link
Contributor

@vooft vooft commented Aug 26, 2022

This PR fixes 2 issues:

  1. If an exception is thrown from ErrorHandlingUtils.retryBatch (for instance if SeekToCurrentBatchErrorHandler failed), then retrying field will never be reset back to false.
  2. Error handlers are shared between different consumers when concurrency > 1, when one of them is retrying and rebalancing happens, it may result in other consumers being paused and never recovering again.

I'm not sure if it is a good idea in general to add a state to a shared class, it probably should be attributed to a particular consumer rather than an error handler.

Fixes #2382

@garyrussell
Copy link
Contributor

Ouch! Good catch. Thanks.

Using a TL is fine since there is a 1:1 relationship between thread and consumer.

@vooft
Copy link
Contributor Author

vooft commented Aug 27, 2022

Fixed checkstyle

@vooft
Copy link
Contributor Author

vooft commented Aug 27, 2022

@garyrussell not sure if the failed test is related to my change, build is green on my machine.

KafkaMessageListenerContainerTests > testInvokeRecordInterceptorAllSkipped(AckMode, boolean) > org.springframework.kafka.listener.KafkaMessageListenerContainerTests.testInvokeRecordInterceptorAllSkipped(AckMode, boolean)[1] FAILED
    org.mockito.exceptions.verification.VerificationInOrderFailure: 
    Verification in order failure
    Wanted but not invoked:
    object.intercept(
        ConsumerRecord(topic = foo, partition = 0, leaderEpoch = null, offset = 1, NoTimestampType = -1, serialized key size = -1, serialized value size = -1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = bar),
        Mock for Consumer, hashCode: 1199858180
    );
    -> at org.springframework.kafka.listener.KafkaMessageListenerContainerTests.testInvokeRecordInterceptorAllSkipped(KafkaMessageListenerContainerTests.java:3812)
    Wanted anywhere AFTER following interaction:
    consumer.commitSync(
        {foo-0 = OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}},
        PT1M
    );
    -> at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:3189)
        at app//org.springframework.kafka.listener.KafkaMessageListenerContainerTests.testInvokeRecordInterceptorAllSkipped(KafkaMessageListenerContainerTests.java:3812)

@garyrussell
Copy link
Contributor

Probably an unrelated flaky test. Will take a look on Monday.

@garyrussell garyrussell merged commit 612eb70 into spring-projects:main Aug 29, 2022
garyrussell pushed a commit that referenced this pull request Aug 29, 2022
Remove sharing of the retrying field in FallbackBatchErrorHandler (#2383)

* Remove sharing of the retrying field in FallbackBatchErrorHandler

* Fix checkstyle
@garyrussell
Copy link
Contributor

Cherry-picked to 2.9.x, back ported to 2.8.x.

6a28cc5
3de1e89

Thanks @vooft - in future, please limit first commit headline to 50 chars.

@vooft
Copy link
Contributor Author

vooft commented Aug 29, 2022

@garyrussell thank you! Sure, will keep it short next time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

FallbackBatchErrorHandler may cause consumers to pause even if they are not retrying
2 participants