Skip to content

GH-656: Fix seek on rollback [Backport] #659

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.converter.MessageConverter;
Expand Down Expand Up @@ -68,6 +69,8 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe

private ApplicationEventPublisher applicationEventPublisher;

private AfterRollbackProcessor<K, V> afterRollbackProcessor;

/**
* Specify a {@link ConsumerFactory} to use.
* @param consumerFactory The consumer factory.
Expand Down Expand Up @@ -162,6 +165,17 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
this.applicationEventPublisher = applicationEventPublisher;
}

/**
* Set a processor to invoke after a transaction rollback; typically will
* seek the unprocessed topic/partition to reprocess the records.
* The default does so, including the failed record.
* @param afterRollbackProcessor the processor.
* @since 1.3.5
*/
public void setAfterRollbackProcessor(AfterRollbackProcessor<K, V> afterRollbackProcessor) {
this.afterRollbackProcessor = afterRollbackProcessor;
}

/**
* Obtain the properties template for this factory - set properties as needed
* and they will be copied to a final properties instance for the endpoint.
Expand Down Expand Up @@ -232,6 +246,9 @@ protected void initializeContainer(C instance) {
ContainerProperties properties = instance.getContainerProperties();
BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern",
"messageListener", "ackCount", "ackTime");
if (this.afterRollbackProcessor != null) {
instance.setAfterRollbackProcessor(this.afterRollbackProcessor);
}
if (this.containerProperties.getAckCount() > 0) {
properties.setAckCount(this.containerProperties.getAckCount());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ public enum AckMode {

private int phase = DEFAULT_PHASE;

private AfterRollbackProcessor<K, V> afterRollbackProcessor = new DefaultAfterRollbackProcessor<>();

private volatile boolean running = false;

protected AbstractMessageListenerContainer(ContainerProperties containerProperties) {
Expand Down Expand Up @@ -189,6 +191,22 @@ public int getPhase() {
return this.phase;
}

protected AfterRollbackProcessor<K, V> getAfterRollbackProcessor() {
return this.afterRollbackProcessor;
}

/**
* Set a processor to perform seeks on unprocessed records after a rollback.
* Default will seek to current position all topics/partitions, including the failed
* record.
* @param afterRollbackProcessor the processor.
* @since 1.3.5
*/
public void setAfterRollbackProcessor(AfterRollbackProcessor<K, V> afterRollbackProcessor) {
Assert.notNull(afterRollbackProcessor, "'afterRollbackProcessor' cannot be null");
this.afterRollbackProcessor = afterRollbackProcessor;
}

public ContainerProperties getContainerProperties() {
return this.containerProperties;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import java.util.List;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* Invoked by a listener container with remaining, unprocessed, records
* (including the failed record). Implementations should seek the desired
* topics/partitions so that records will be re-fetched on the next
* poll. When used with a batch listener, the entire batch of records is
* provided.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
*
* @since 1.3.5
*
*/
@FunctionalInterface
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not going to work on Java 7, I guess.

Or will be ignored at runtime...

public interface AfterRollbackProcessor<K, V> {

/**
* Process the remaining records.
* @param records the records.
* @param consumer the consumer.
*/
void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;

/**
* Default implementation of {@link AfterRollbackProcessor}. Seeks all
* topic/partitions so the records will be re-fetched, including the failed
* record.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
*
* @since 1.3.5
*
*/
public class DefaultAfterRollbackProcessor<K, V> implements AfterRollbackProcessor<K, V> {

private static final Log logger = LogFactory.getLog(DefaultAfterRollbackProcessor.class);

@Override
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer) {
Map<TopicPartition, Long> seekOffsets = new HashMap<>();
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
while (iterator.hasNext()) {
ConsumerRecord<K, V> record = iterator.next();
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
if (!seekOffsets.containsKey(topicPartition)) {
seekOffsets.put(topicPartition, record.offset());
}
}
for (Entry<TopicPartition, Long> entry : seekOffsets.entrySet()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this extra loop if we can just perform seek during the first one skipping already processed partitions ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems much more complicated to me to keep track of which ones we've already seeked.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I mean something like this:

Set<TopicPartition> seekOffsets = new HashSet<>();
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
while (iterator.hasNext()) {
	ConsumerRecord<K, V> record = iterator.next();
	TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
	if (seekOffsets.add(topicPartition)) {
		try {
			consumer.seek(topicPartition, record.offset());
		}
		catch (Exception e) {
			logger.error("Failed to seek " + entry.getKey() + " to " + entry.getValue());
		}
	}
}

No? What am I missing?

Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a big difference and I doubt the performance will be any better - it has to calculate hash codes instead performing the second loop - but feel free to change it if you insist.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it has to calculate hash codes

???

Don't you do that already with the current HashMap and seekOffsets.containsKey() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doh, of course; ok. Feel free to change it.

try {
consumer.seek(entry.getKey(), entry.getValue());
}
catch (Exception e) {
logger.error("Failed to seek " + entry.getKey() + " to " + entry.getValue());
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -804,18 +804,7 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
}
catch (RuntimeException e) {
this.logger.error("Transaction rolled back", e);
Map<TopicPartition, Long> seekOffsets = new HashMap<>();
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
while (iterator.hasNext()) {
ConsumerRecord<K, V> record = iterator.next();
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
if (!seekOffsets.containsKey(topicPartition)) {
seekOffsets.put(topicPartition, record.offset());
}
}
for (Entry<TopicPartition, Long> entry : seekOffsets.entrySet()) {
this.consumer.seek(entry.getKey(), entry.getValue());
}
getAfterRollbackProcessor().process(recordList, this.consumer);
}
}

Expand Down Expand Up @@ -850,7 +839,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
}
}
catch (RuntimeException e) {
if (this.containerProperties.isAckOnError() && !this.autoCommit) {
if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) {
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(recordList)) {
this.acks.add(record);
}
Expand All @@ -860,7 +849,11 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
}
try {
this.batchErrorHandler.handle(e, records);
// if the handler handled the error (no exception), go ahead and commit
if (producer != null) {
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(recordList)) {
this.acks.add(record);
}
sendOffsetsToTransaction(producer);
}
}
Expand Down Expand Up @@ -920,8 +913,12 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
}
catch (RuntimeException e) {
this.logger.error("Transaction rolled back", e);
this.consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
break;
List<ConsumerRecord<K, V>> unprocessed = new ArrayList<>();
unprocessed.add(record);
while (iterator.hasNext()) {
unprocessed.add(iterator.next());
}
getAfterRollbackProcessor().process(unprocessed, this.consumer);
}
}
}
Expand Down Expand Up @@ -957,45 +954,11 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
else {
this.listener.onMessage(record);
}
if (this.isRecordAck) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
if (producer == null) {
if (this.containerProperties.isSyncCommits()) {
this.consumer.commitSync(offsetsToCommit);
}
else {
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
}
}
else {
this.acks.add(record);
}
}
else if (!this.isAnyManualAck && !this.autoCommit) {
this.acks.add(record);
}
if (producer != null) {
sendOffsetsToTransaction(producer);
}
ackCurrent(record, producer);
}
catch (RuntimeException e) {
if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) {
if (this.isRecordAck) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
if (this.containerProperties.isSyncCommits()) {
this.consumer.commitSync(offsetsToCommit);
}
else {
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
}
}
else if (!this.isAnyManualAck) {
this.acks.add(record);
}
ackCurrent(record, producer);
}
if (this.errorHandler == null) {
throw e;
Expand Down Expand Up @@ -1023,6 +986,39 @@ else if (!this.isAnyManualAck) {
return null;
}

public void ackCurrent(final ConsumerRecord<K, V> record, @SuppressWarnings("rawtypes") Producer producer) {
if (this.isRecordAck) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
if (producer == null) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Committing: " + offsetsToCommit);
}
if (this.containerProperties.isSyncCommits()) {
this.consumer.commitSync(offsetsToCommit);
}
else {
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
}
}
else {
this.acks.add(record);
}
}
else if (!this.isAnyManualAck && !this.autoCommit) {
this.acks.add(record);
}
if (producer != null) {
try {
sendOffsetsToTransaction(producer);
}
catch (Exception e) {
this.logger.error("Send offsets to transaction failed", e);
}
}
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private void sendOffsetsToTransaction(Producer producer) {
handleAcks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ public void testAckOnErrorRecord() throws Exception {
}
}
assertThat(consumer.position(new TopicPartition(topic9, 0))).isEqualTo(1);
// this consumer is positioned at 1, the next offset after the successfully
// this consumer is positioned at 2, the next offset after the successfully
// processed 'qux'
// it has been updated even 'baz' failed
for (int i = 0; i < 100; i++) {
Expand Down
Loading