Skip to content

Provide ability to disable/limit retries in batch listener when using DefaultAfterRollbackProcessor #2588

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
RuslanHryn opened this issue Feb 16, 2023 · 7 comments · Fixed by #2888

Comments

@RuslanHryn
Copy link

Expected Behavior
Configuration to disable/limit retries in the batch listener when using DB transactions and DefaultAfterRollbackProcessor
Example of listener @KafkaListener(topics = BATCH_TOPIC, batch = "true") {}

Current Behavior
Currently, the DefaultAfterRollbackProcessor retries the failing message infinitely if using batch listener with DB transactions.

Context
The main problem if we forget to handle the failing message inside the listener then the failed message will stuck until we take some manual action to skip this message. That's why we would like to have a config to disable/limit retries at the configurations level.
I tried to find a way to disable retries for batch listeners but unsuccessfully.
new FixedBackOff(0, 0)
new DefaultAfterRollbackProcessor<>(deadLetterPublishingRecoverer, backOff);

The workaround can be to force to use recoverable=true always in order to limit retries

return new DefaultAfterRollbackProcessor<>(deadLetterPublishingRecoverer, backOff) {
            @Override
            public void process(
                    @NotNull List<ConsumerRecord<Object, Object>> consumerRecords,
                    @NotNull org.apache.kafka.clients.consumer.Consumer<Object, Object> consumer,
                    MessageListenerContainer container,
                    @NotNull Exception exception,
                    boolean recoverable,
                    @NotNull ContainerProperties.EOSMode eosMode) {
                // Force all messages to be recoverable.
                super.process(consumerRecords, consumer, container, exception, true, eosMode);
            }
        };
@RuslanHryn RuslanHryn changed the title Provide ability to disable/limit retries in batch listener when use DefaultAfterRollbackProcessor Provide ability to disable/limit retries in batch listener when using DefaultAfterRollbackProcessor Feb 16, 2023
@garyrussell
Copy link
Contributor

garyrussell commented Feb 16, 2023

Forcing it to be recoverable will not do what you expect. We don't know which record in the batch failed, the whole batch is passed in and the "recovered" record will just be the first one in the batch and not necessarily the one that is causing the failure; that's why batch failures are not recoverable.

To solve this problem, we should probably enhance the DARP to understand BatchListenerFailedException s (like the DefaultErrorHandler now does) so the app can indicate which record caused the failure and only recover that one.

We could also provide recovery for other exceptions by recovering every record in the batch (again, like the DefaultErrorHandler).

Note that if you are only using DB transactions (and not publishing to Kafka as well as updating the DB in the listener), there is no need to make the container transactional.

@garyrussell garyrussell added this to the 3.x Backlog milestone Feb 16, 2023
@RuslanHryn
Copy link
Author

Forcing it to be recoverable will not do what you expect. We don't know which record in the batch failed, the whole batch is passed in and the "recovered" record will just be the first one in the batch and not necessarily the one that is causing the failure; that's why batch failures are not recoverable.

Yes, you are right. It will just limit retries. It will not be infinite

Note that if you are only using DB transactions (and not publishing to Kafka as well as updating the DB in the listener), there is no need to make the container transactional.

we always update DB in RecordFilterStrategy to save processed messages into DB in order to implement an idempotent consumer pattern. That's why we can't use DefaultErrorHandler

@garyrussell
Copy link
Contributor

garyrussell commented Feb 16, 2023

You don’t need Kafka transactions if you are not producing to Kafka in your listener. They have nothing to do with idempotent consumers, only exactly once semantics for consume->process->produce scenarios.

Or, for atomically producing multiple records.

@RuslanHryn
Copy link
Author

You don’t need Kafka transactions if you are not producing to Kafka in your listener. They have nothing to do with idempotent consumers, only exactly once semantics for consume->process->produce scenarios.

Or, for atomically producing multiple records.

We don't use Kafka transactions at all. We use DB transactions instead to implement idempotent consumer. Anyway, it does not matter which transactions to use there, the behavior of retries will be the same. So it would be nice to have proper retries in the batch listener with DefaultAfterRollbackProcessor or an option to disable retries to avoid the infinite loop

@garyrussell
Copy link
Contributor

I agree; hence my comments above about enhancing the rollback processor.

However, adding a transaction manager to the container is only intended for the use of Kafka transactions.

You can use @Transactional on the listener method or, to include the listener adapter (including any RecordFilterStrategy) in the scope of the transaction, add a TransactionInterceptor to the container's adviceChain property.

The container should never have a transaction manager if Kafka transactions are not being used.

@Wzy19930507
Copy link
Contributor

Hi, @garyrussell may i pick it up?

@artembilan
Copy link
Member

@Wzy19930507 ,

sure you can!

This is an Open Source project, so any contribution is welcome!
I cannot advice anything for you what exactly to do at the moment, but let's see first if you have anything in mind how to fix it.
Don't hesitate to open Pull Request: https://github.com/spring-projects/spring-kafka/blob/main/CONTRIBUTING.adoc !

Wzy19930507 pushed a commit to Wzy19930507/spring-kafka that referenced this issue Nov 9, 2023
spring-projectsGH-2588: ARBP support BatchListenerFailedException

* add default function `processBatch` in AfterRollbackProcessor

* support BatchListenerFailedException in DefaultAfterRollbackProcessor

* Add unit tests
Wzy19930507 pushed a commit to Wzy19930507/spring-kafka that referenced this issue Nov 9, 2023
Resolves spring-projects#2588

* add default function `processBatch` in AfterRollbackProcessor

* support BatchListenerFailedException in DefaultAfterRollbackProcessor

* Add unit tests
Wzy19930507 pushed a commit to Wzy19930507/spring-kafka that referenced this issue Nov 13, 2023
spring-projectsGH-2588: ARBP support BatchListenerFailedException

* support retry and recover in `DefaultAfterRollbackProcessor.processBatch`

* add nextBackOff at `ListenerUtils`

* fix assertThat `KafkaHeaders.DLT_ORIGINAL_OFFSET` at `TransactionalContainerTests.testMaxFailures`
Wzy19930507 pushed a commit to Wzy19930507/spring-kafka that referenced this issue Nov 17, 2023
…ckProcessor`

* add method processBatch at `AfterRollbackProcessor`

* add opt-in property `batchRecoverAfterRollback` at `ContainerProperties`

* change format to `BatchListenerFailedException.getMessage`

* add batch recoverable after rollback unit test
Wzy19930507 pushed a commit to Wzy19930507/spring-kafka that referenced this issue Nov 21, 2023
Wzy19930507 pushed a commit to Wzy19930507/spring-kafka that referenced this issue Nov 21, 2023
…ckProcessor`

deprecated transactionManager and add KafkaAwareTransactionManager at ContainerProperties, after remove transactionManager, change `KafkaMessageListenerContainer#transactionManager` type to PlatformTransactionManager
Wzy19930507 pushed a commit to Wzy19930507/spring-kafka that referenced this issue Dec 16, 2023
…ckProcessor`

modify toString in `ContainerProperties`
Wzy19930507 pushed a commit to Wzy19930507/spring-kafka that referenced this issue Dec 16, 2023
@artembilan artembilan modified the milestones: 3.1 Backlog, Backlog Jan 16, 2024
Wzy19930507 pushed a commit to Wzy19930507/spring-kafka that referenced this issue Jan 18, 2024
…ckProcessor`

* add method processBatch at `AfterRollbackProcessor`

* add opt-in property `batchRecoverAfterRollback` at `ContainerProperties`

* change format to `BatchListenerFailedException.getMessage`

* add batch recoverable after rollback unit test
Wzy19930507 added a commit to Wzy19930507/spring-kafka that referenced this issue Jan 18, 2024
…ckProcessor`

deprecated transactionManager and add KafkaAwareTransactionManager at ContainerProperties, after remove transactionManager, change `KafkaMessageListenerContainer#transactionManager` type to PlatformTransactionManager

* modify toString in `ContainerProperties`
Wzy19930507 pushed a commit to Wzy19930507/spring-kafka that referenced this issue Jan 19, 2024
…ckProcessor`

* add method processBatch at `AfterRollbackProcessor`

* add opt-in property `batchRecoverAfterRollback` at `ContainerProperties`

* change format to `BatchListenerFailedException.getMessage`

* add batch recoverable after rollback unit test
Wzy19930507 pushed a commit to Wzy19930507/spring-kafka that referenced this issue Jan 21, 2024
…ckProcessor`

* add method processBatch at `AfterRollbackProcessor`

* add opt-in property `batchRecoverAfterRollback` at `ContainerProperties`

* change format to `BatchListenerFailedException.getMessage`

* add batch recoverable after rollback unit test
Wzy19930507 pushed a commit to Wzy19930507/spring-kafka that referenced this issue Feb 1, 2024
…ckProcessor`

* add method processBatch at `AfterRollbackProcessor`

* add opt-in property `batchRecoverAfterRollback` at `ContainerProperties`

* change format to `BatchListenerFailedException.getMessage`

* add batch recoverable after rollback unit test
Wzy19930507 pushed a commit to Wzy19930507/spring-kafka that referenced this issue Feb 2, 2024
…ckProcessor`

* add method processBatch at `AfterRollbackProcessor`

* add opt-in property `batchRecoverAfterRollback` at `ContainerProperties`

* change format to `BatchListenerFailedException.getMessage`

* add batch recoverable after rollback unit test
sobychacko pushed a commit that referenced this issue Feb 5, 2024
* support batch recoverable `DefaultAfterRollbackProcessor`
* add method processBatch at `AfterRollbackProcessor`
* add opt-in property `batchRecoverAfterRollback` at `ContainerProperties`
* change format to `BatchListenerFailedException.getMessage`
* add batch recoverable after rollback unit test
* review fix
* `what-new.adoc` and `annotation-error-handling.adoc`
* add javadoc in `SeekUtils` and `AfterRollbackProcessor`
* change `ListenerUtils.nextBackOff` public to default
* change logger args to static string
* @author classes.
* fix adoc
* poblish `AfterRollbackProcessor`
* javadoc at `ContainerProperties`
* fix review and fix test bug at DefaultAfterRollbackProcessorTests
* add @test to DefaultAfterRollbackProcessorTests.testNoEarlyExitBackOff
* polish TransactionalContainerTests
* fix bug Tests at DefaultAfterRollbackProcessorTests method `testNoEarlyExitBackOff` and testEarlyExitBackOff
@sobychacko sobychacko modified the milestones: Backlog, 3.2.0-M1 Apr 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants