From 8ed559de4134d66746e5b5369ba8f288e7c6d954 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 19 Apr 2018 12:14:54 -0400 Subject: [PATCH] GH-656: Fix seek on rollback Fixes https://github.com/spring-projects/spring-kafka/issues/656 Fixes https://github.com/spring-projects/spring-kafka/issues/657 Previously, after a rollback, we only performed a `seek` on the failed record. We need to seek for all unprocessed records. Also, when no error handler was provided, and using a batch listener, the offsets were added to `acks` and incorrectly committed. (#657). Also, if a `ContainerAwareErrorHandler` "handles" the error, the offsets weren't committed. Enhance the tests to verify full seeks. Add a new test to verify the batch listener doesn't commit after a roll back. **cherry-pick to 2.1.x, 2.0.x** I will backport to 1.3.x after review. * Some simple polishing --- ...AbstractKafkaListenerContainerFactory.java | 17 ++++ .../AbstractMessageListenerContainer.java | 18 ++++ .../listener/AfterRollbackProcessor.java | 49 ++++++++++ .../DefaultAfterRollbackProcessor.java | 69 +++++++++++++ .../KafkaMessageListenerContainer.java | 98 +++++++++---------- ...ncurrentMessageListenerContainerTests.java | 2 +- .../listener/TransactionalContainerTests.java | 94 ++++++++++++++++-- src/reference/asciidoc/kafka.adoc | 11 +++ src/reference/asciidoc/whats-new.adoc | 4 + 9 files changed, 302 insertions(+), 60 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java index 6cde6b6362..ad1a2784e9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java @@ -24,6 +24,7 @@ import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer; +import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.kafka.support.converter.MessageConverter; @@ -68,6 +69,8 @@ public abstract class AbstractKafkaListenerContainerFactory afterRollbackProcessor; + /** * Specify a {@link ConsumerFactory} to use. * @param consumerFactory The consumer factory. @@ -162,6 +165,17 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv this.applicationEventPublisher = applicationEventPublisher; } + /** + * Set a processor to invoke after a transaction rollback; typically will + * seek the unprocessed topic/partition to reprocess the records. + * The default does so, including the failed record. + * @param afterRollbackProcessor the processor. + * @since 1.3.5 + */ + public void setAfterRollbackProcessor(AfterRollbackProcessor afterRollbackProcessor) { + this.afterRollbackProcessor = afterRollbackProcessor; + } + /** * Obtain the properties template for this factory - set properties as needed * and they will be copied to a final properties instance for the endpoint. @@ -232,6 +246,9 @@ protected void initializeContainer(C instance) { ContainerProperties properties = instance.getContainerProperties(); BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern", "messageListener", "ackCount", "ackTime"); + if (this.afterRollbackProcessor != null) { + instance.setAfterRollbackProcessor(this.afterRollbackProcessor); + } if (this.containerProperties.getAckCount() > 0) { properties.setAckCount(this.containerProperties.getAckCount()); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 76bd6d792d..fa03f85cb9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -115,6 +115,8 @@ public enum AckMode { private int phase = DEFAULT_PHASE; + private AfterRollbackProcessor afterRollbackProcessor = new DefaultAfterRollbackProcessor<>(); + private volatile boolean running = false; protected AbstractMessageListenerContainer(ContainerProperties containerProperties) { @@ -189,6 +191,22 @@ public int getPhase() { return this.phase; } + protected AfterRollbackProcessor getAfterRollbackProcessor() { + return this.afterRollbackProcessor; + } + + /** + * Set a processor to perform seeks on unprocessed records after a rollback. + * Default will seek to current position all topics/partitions, including the failed + * record. + * @param afterRollbackProcessor the processor. + * @since 1.3.5 + */ + public void setAfterRollbackProcessor(AfterRollbackProcessor afterRollbackProcessor) { + Assert.notNull(afterRollbackProcessor, "'afterRollbackProcessor' cannot be null"); + this.afterRollbackProcessor = afterRollbackProcessor; + } + public ContainerProperties getContainerProperties() { return this.containerProperties; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java new file mode 100644 index 0000000000..a8790464d4 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java @@ -0,0 +1,49 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.util.List; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +/** + * Invoked by a listener container with remaining, unprocessed, records + * (including the failed record). Implementations should seek the desired + * topics/partitions so that records will be re-fetched on the next + * poll. When used with a batch listener, the entire batch of records is + * provided. + * + * @param the key type. + * @param the value type. + * + * @author Gary Russell + * + * @since 1.3.5 + * + */ +@FunctionalInterface +public interface AfterRollbackProcessor { + + /** + * Process the remaining records. + * @param records the records. + * @param consumer the consumer. + */ + void process(List> records, Consumer consumer); + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java new file mode 100644 index 0000000000..c9ade1f7e4 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java @@ -0,0 +1,69 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +/** + * Default implementation of {@link AfterRollbackProcessor}. Seeks all + * topic/partitions so the records will be re-fetched, including the failed + * record. + * + * @param the key type. + * @param the value type. + * + * @author Gary Russell + * + * @since 1.3.5 + * + */ +public class DefaultAfterRollbackProcessor implements AfterRollbackProcessor { + + private static final Log logger = LogFactory.getLog(DefaultAfterRollbackProcessor.class); + + @Override + public void process(List> records, Consumer consumer) { + Map seekOffsets = new HashMap<>(); + Iterator> iterator = records.iterator(); + while (iterator.hasNext()) { + ConsumerRecord record = iterator.next(); + TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); + if (!seekOffsets.containsKey(topicPartition)) { + seekOffsets.put(topicPartition, record.offset()); + } + } + for (Entry entry : seekOffsets.entrySet()) { + try { + consumer.seek(entry.getKey(), entry.getValue()); + } + catch (Exception e) { + logger.error("Failed to seek " + entry.getKey() + " to " + entry.getValue()); + } + } + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index e6989b9eab..15bdb0aa54 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -804,18 +804,7 @@ public void doInTransactionWithoutResult(TransactionStatus s) { } catch (RuntimeException e) { this.logger.error("Transaction rolled back", e); - Map seekOffsets = new HashMap<>(); - Iterator> iterator = records.iterator(); - while (iterator.hasNext()) { - ConsumerRecord record = iterator.next(); - TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); - if (!seekOffsets.containsKey(topicPartition)) { - seekOffsets.put(topicPartition, record.offset()); - } - } - for (Entry entry : seekOffsets.entrySet()) { - this.consumer.seek(entry.getKey(), entry.getValue()); - } + getAfterRollbackProcessor().process(recordList, this.consumer); } } @@ -850,7 +839,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords recor } } catch (RuntimeException e) { - if (this.containerProperties.isAckOnError() && !this.autoCommit) { + if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) { for (ConsumerRecord record : getHighestOffsetRecords(recordList)) { this.acks.add(record); } @@ -860,7 +849,11 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords recor } try { this.batchErrorHandler.handle(e, records); + // if the handler handled the error (no exception), go ahead and commit if (producer != null) { + for (ConsumerRecord record : getHighestOffsetRecords(recordList)) { + this.acks.add(record); + } sendOffsetsToTransaction(producer); } } @@ -920,8 +913,12 @@ public void doInTransactionWithoutResult(TransactionStatus s) { } catch (RuntimeException e) { this.logger.error("Transaction rolled back", e); - this.consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset()); - break; + List> unprocessed = new ArrayList<>(); + unprocessed.add(record); + while (iterator.hasNext()) { + unprocessed.add(iterator.next()); + } + getAfterRollbackProcessor().process(unprocessed, this.consumer); } } } @@ -957,45 +954,11 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord recor else { this.listener.onMessage(record); } - if (this.isRecordAck) { - Map offsetsToCommit = - Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), - new OffsetAndMetadata(record.offset() + 1)); - if (producer == null) { - if (this.containerProperties.isSyncCommits()) { - this.consumer.commitSync(offsetsToCommit); - } - else { - this.consumer.commitAsync(offsetsToCommit, this.commitCallback); - } - } - else { - this.acks.add(record); - } - } - else if (!this.isAnyManualAck && !this.autoCommit) { - this.acks.add(record); - } - if (producer != null) { - sendOffsetsToTransaction(producer); - } + ackCurrent(record, producer); } catch (RuntimeException e) { if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) { - if (this.isRecordAck) { - Map offsetsToCommit = - Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), - new OffsetAndMetadata(record.offset() + 1)); - if (this.containerProperties.isSyncCommits()) { - this.consumer.commitSync(offsetsToCommit); - } - else { - this.consumer.commitAsync(offsetsToCommit, this.commitCallback); - } - } - else if (!this.isAnyManualAck) { - this.acks.add(record); - } + ackCurrent(record, producer); } if (this.errorHandler == null) { throw e; @@ -1023,6 +986,39 @@ else if (!this.isAnyManualAck) { return null; } + public void ackCurrent(final ConsumerRecord record, @SuppressWarnings("rawtypes") Producer producer) { + if (this.isRecordAck) { + Map offsetsToCommit = + Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), + new OffsetAndMetadata(record.offset() + 1)); + if (producer == null) { + if (this.logger.isDebugEnabled()) { + this.logger.debug("Committing: " + offsetsToCommit); + } + if (this.containerProperties.isSyncCommits()) { + this.consumer.commitSync(offsetsToCommit); + } + else { + this.consumer.commitAsync(offsetsToCommit, this.commitCallback); + } + } + else { + this.acks.add(record); + } + } + else if (!this.isAnyManualAck && !this.autoCommit) { + this.acks.add(record); + } + if (producer != null) { + try { + sendOffsetsToTransaction(producer); + } + catch (Exception e) { + this.logger.error("Send offsets to transaction failed", e); + } + } + } + @SuppressWarnings({ "unchecked", "rawtypes" }) private void sendOffsetsToTransaction(Producer producer) { handleAcks(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index 6c5ef816d1..6234d922c4 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -513,7 +513,7 @@ public void testAckOnErrorRecord() throws Exception { } } assertThat(consumer.position(new TopicPartition(topic9, 0))).isEqualTo(1); - // this consumer is positioned at 1, the next offset after the successfully + // this consumer is positioned at 2, the next offset after the successfully // processed 'qux' // it has been updated even 'baz' failed for (int i = 0; i < 100; i++) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index 7775310e8a..02637e5f70 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -33,6 +33,8 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -63,6 +65,7 @@ import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; import org.springframework.kafka.listener.config.ContainerProperties; +import org.springframework.kafka.support.TopicPartitionInitialOffset; import org.springframework.kafka.test.rule.KafkaEmbedded; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.kafka.transaction.KafkaTransactionManager; @@ -161,14 +164,79 @@ private void testConsumeAndProduceTransactionGuts(AckMode ackMode) throws Except @Test public void testConsumeAndProduceTransactionRollback() throws Exception { Consumer consumer = mock(Consumer.class); - final TopicPartition topicPartition = new TopicPartition("foo", 0); + final TopicPartition topicPartition0 = new TopicPartition("foo", 0); + final TopicPartition topicPartition1 = new TopicPartition("foo", 1); + Map>> recordMap = new HashMap<>(); + recordMap.put(topicPartition0, Collections.singletonList(new ConsumerRecord<>("foo", 0, 0, "key", "value"))); + recordMap.put(topicPartition1, Collections.singletonList(new ConsumerRecord<>("foo", 1, 0, "key", "value"))); + ConsumerRecords records = new ConsumerRecords(recordMap); + final AtomicBoolean done = new AtomicBoolean(); willAnswer(i -> { - i.getArgumentAt(1, ConsumerRebalanceListener.class) - .onPartitionsAssigned(Collections.singletonList(topicPartition)); + if (done.compareAndSet(false, true)) { + return records; + } + else { + Thread.sleep(500); + return null; + } + }).given(consumer).poll(anyLong()); + final CountDownLatch seekLatch = new CountDownLatch(2); + willAnswer(i -> { + seekLatch.countDown(); return null; - }).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); - ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(topicPartition, - Collections.singletonList(new ConsumerRecord<>("foo", 0, 0, "key", "value")))); + }).given(consumer).seek(any(), anyLong()); + ConsumerFactory cf = mock(ConsumerFactory.class); + willReturn(consumer).given(cf).createConsumer("group", null); + Producer producer = mock(Producer.class); + final CountDownLatch closeLatch = new CountDownLatch(1); + willAnswer(i -> { + closeLatch.countDown(); + return null; + }).given(producer).close(); + ProducerFactory pf = mock(ProducerFactory.class); + given(pf.transactionCapable()).willReturn(true); + given(pf.createProducer()).willReturn(producer); + KafkaTransactionManager tm = new KafkaTransactionManager(pf); + ContainerProperties props = new ContainerProperties(new TopicPartitionInitialOffset("foo", 0), + new TopicPartitionInitialOffset("foo", 1)); + props.setGroupId("group"); + props.setTransactionManager(tm); + final KafkaTemplate template = new KafkaTemplate(pf); + props.setMessageListener((MessageListener) m -> { + template.send("bar", "baz"); + throw new RuntimeException("fail"); + }); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, props); + container.setBeanName("rollback"); + container.start(); + assertThat(closeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(seekLatch.await(10, TimeUnit.SECONDS)).isTrue(); + InOrder inOrder = inOrder(producer); + inOrder.verify(producer).beginTransaction(); + ArgumentCaptor captor = ArgumentCaptor.forClass(ProducerRecord.class); + verify(producer).send(captor.capture(), any(Callback.class)); + assertThat(captor.getValue()).isEqualTo(new ProducerRecord("bar", "baz")); + inOrder.verify(producer, never()).sendOffsetsToTransaction(anyMap(), anyString()); + inOrder.verify(producer, never()).commitTransaction(); + inOrder.verify(producer).abortTransaction(); + inOrder.verify(producer).close(); + verify(consumer).seek(topicPartition0, 0); + verify(consumer).seek(topicPartition1, 0); + verify(consumer, never()).commitSync(any()); + container.stop(); + verify(pf, times(1)).createProducer(); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testConsumeAndProduceTransactionRollbackBatch() throws Exception { + Consumer consumer = mock(Consumer.class); + final TopicPartition topicPartition0 = new TopicPartition("foo", 0); + final TopicPartition topicPartition1 = new TopicPartition("foo", 1); + Map>> recordMap = new HashMap<>(); + recordMap.put(topicPartition0, Collections.singletonList(new ConsumerRecord<>("foo", 0, 0, "key", "value"))); + recordMap.put(topicPartition1, Collections.singletonList(new ConsumerRecord<>("foo", 1, 0, "key", "value"))); + ConsumerRecords records = new ConsumerRecords(recordMap); final AtomicBoolean done = new AtomicBoolean(); willAnswer(i -> { if (done.compareAndSet(false, true)) { @@ -179,6 +247,11 @@ public void testConsumeAndProduceTransactionRollback() throws Exception { return null; } }).given(consumer).poll(anyLong()); + final CountDownLatch seekLatch = new CountDownLatch(2); + willAnswer(i -> { + seekLatch.countDown(); + return null; + }).given(consumer).seek(any(), anyLong()); ConsumerFactory cf = mock(ConsumerFactory.class); willReturn(consumer).given(cf).createConsumer("group", null); Producer producer = mock(Producer.class); @@ -191,11 +264,12 @@ public void testConsumeAndProduceTransactionRollback() throws Exception { given(pf.transactionCapable()).willReturn(true); given(pf.createProducer()).willReturn(producer); KafkaTransactionManager tm = new KafkaTransactionManager(pf); - ContainerProperties props = new ContainerProperties("foo"); + ContainerProperties props = new ContainerProperties(new TopicPartitionInitialOffset("foo", 0), + new TopicPartitionInitialOffset("foo", 1)); props.setGroupId("group"); props.setTransactionManager(tm); final KafkaTemplate template = new KafkaTemplate(pf); - props.setMessageListener((MessageListener) m -> { + props.setMessageListener((BatchMessageListener) recordlist -> { template.send("bar", "baz"); throw new RuntimeException("fail"); }); @@ -203,6 +277,7 @@ public void testConsumeAndProduceTransactionRollback() throws Exception { container.setBeanName("rollback"); container.start(); assertThat(closeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(seekLatch.await(10, TimeUnit.SECONDS)).isTrue(); InOrder inOrder = inOrder(producer); inOrder.verify(producer).beginTransaction(); ArgumentCaptor captor = ArgumentCaptor.forClass(ProducerRecord.class); @@ -212,6 +287,9 @@ public void testConsumeAndProduceTransactionRollback() throws Exception { inOrder.verify(producer, never()).commitTransaction(); inOrder.verify(producer).abortTransaction(); inOrder.verify(producer).close(); + verify(consumer).seek(topicPartition0, 0); + verify(consumer).seek(topicPartition1, 0); + verify(consumer, never()).commitSync(any()); container.stop(); verify(pf, times(1)).createProducer(); } diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index 0204fcb5d6..1f864ff38e 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -1165,6 +1165,17 @@ public interface KafkaListenerErrorHandler { As you can see, you have access to the spring-messaging `Message` object produced by the message converter and the exception that was thrown by the listener, wrapped in a `ListenerExecutionFailedException`. The error handler can throw the original or a new exception which will be thrown to the container. Anything returned by the error handler is ignored. +[[after-rollback]] +===== After Rollback Processor + +When using transactions, if the listener container throws an exception (and an error handler, if present, throws an exception), the transaction is rolled back. +By default, any unprocessed records (including the failed record) will be re-fetched on the next poll. +This is achieved by performing `seek` operations in the `DefaultAfterRollbackProcessor`. +With a batch listener, the entire batch of records will be reprocessed (the container has no knowledge of which record in the batch failed). +To modify this behavior, configure the listener container with a custom `AfterRollbackProcessor`. +For example, with a record-based listener, you might want to keep track of the failed record and give up after some number of attempts - perhaps by publishing it to a dead-letter topic. + + [[kerberos]] ==== Kerberos diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 80a4547a6c..068b5c40ef 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -38,6 +38,10 @@ To restore the previous behavior of using the factory configured `group.id`, set For convenience a test class level `@EmbeddedKafka` annotation is provided with the purpose to register `KafkaEmbedded` as a bean. See <> for more information. +==== After rollback processing + +Starting with _version 1.3.5_, a new `AfterRollbackProcessor` strategy is provided - see <> for more information. + ==== Kerberos Configuration Support for configuring Kerberos is now provided.