Skip to content

Commit 439f2a5

Browse files
garyrussellartembilan
authored andcommitted
GH-656: Fix seek on rollback
Fixes #656 Fixes #657 Previously, after a rollback, we only performed a `seek` on the failed record. We need to seek for all unprocessed records. Also, when no error handler was provided, and using a batch listener, the offsets were added to `acks` and incorrectly committed. (#657). Also, if a `ContainerAwareErrorHandler` "handles" the error, the offsets weren't committed. Enhance the tests to verify full seeks. Add a new test to verify the batch listener doesn't commit after a roll back. **cherry-pick to 2.1.x, 2.0.x** I will backport to 1.3.x after review. * Some simple polishing # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java # spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java # src/reference/asciidoc/whats-new.adoc # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java # src/reference/asciidoc/kafka.adoc # src/reference/asciidoc/whats-new.adoc * Resolve errors for code which doesn't exist yet
1 parent 21fcb11 commit 439f2a5

File tree

9 files changed

+302
-60
lines changed

9 files changed

+302
-60
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

+18
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.kafka.core.ConsumerFactory;
2626
import org.springframework.kafka.core.KafkaTemplate;
2727
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
28+
import org.springframework.kafka.listener.AfterRollbackProcessor;
2829
import org.springframework.kafka.listener.BatchErrorHandler;
2930
import org.springframework.kafka.listener.ErrorHandler;
3031
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@@ -72,6 +73,9 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
7273
private ApplicationEventPublisher applicationEventPublisher;
7374

7475
private KafkaTemplate<K, V> replyTemplate;
76+
77+
private AfterRollbackProcessor<K, V> afterRollbackProcessor;
78+
7579
/**
7680
* Specify a {@link ConsumerFactory} to use.
7781
* @param consumerFactory The consumer factory.
@@ -175,6 +179,17 @@ public void setReplyTemplate(KafkaTemplate<K, V> replyTemplate) {
175179
this.replyTemplate = replyTemplate;
176180
}
177181

182+
/**
183+
* Set a processor to invoke after a transaction rollback; typically will
184+
* seek the unprocessed topic/partition to reprocess the records.
185+
* The default does so, including the failed record.
186+
* @param afterRollbackProcessor the processor.
187+
* @since 1.3.5
188+
*/
189+
public void setAfterRollbackProcessor(AfterRollbackProcessor<K, V> afterRollbackProcessor) {
190+
this.afterRollbackProcessor = afterRollbackProcessor;
191+
}
192+
178193
/**
179194
* Obtain the properties template for this factory - set properties as needed
180195
* and they will be copied to a final properties instance for the endpoint.
@@ -248,6 +263,9 @@ protected void initializeContainer(C instance) {
248263
ContainerProperties properties = instance.getContainerProperties();
249264
BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern",
250265
"messageListener", "ackCount", "ackTime");
266+
if (this.afterRollbackProcessor != null) {
267+
instance.setAfterRollbackProcessor(this.afterRollbackProcessor);
268+
}
251269
if (this.containerProperties.getAckCount() > 0) {
252270
properties.setAckCount(this.containerProperties.getAckCount());
253271
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

+18
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ public enum AckMode {
116116

117117
private int phase = DEFAULT_PHASE;
118118

119+
private AfterRollbackProcessor<K, V> afterRollbackProcessor = new DefaultAfterRollbackProcessor<>();
120+
119121
private volatile boolean running = false;
120122

121123
protected AbstractMessageListenerContainer(ContainerProperties containerProperties) {
@@ -196,6 +198,22 @@ public int getPhase() {
196198
return this.phase;
197199
}
198200

201+
protected AfterRollbackProcessor<K, V> getAfterRollbackProcessor() {
202+
return this.afterRollbackProcessor;
203+
}
204+
205+
/**
206+
* Set a processor to perform seeks on unprocessed records after a rollback.
207+
* Default will seek to current position all topics/partitions, including the failed
208+
* record.
209+
* @param afterRollbackProcessor the processor.
210+
* @since 1.3.5
211+
*/
212+
public void setAfterRollbackProcessor(AfterRollbackProcessor<K, V> afterRollbackProcessor) {
213+
Assert.notNull(afterRollbackProcessor, "'afterRollbackProcessor' cannot be null");
214+
this.afterRollbackProcessor = afterRollbackProcessor;
215+
}
216+
199217
public ContainerProperties getContainerProperties() {
200218
return this.containerProperties;
201219
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.List;
20+
21+
import org.apache.kafka.clients.consumer.Consumer;
22+
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
24+
/**
25+
* Invoked by a listener container with remaining, unprocessed, records
26+
* (including the failed record). Implementations should seek the desired
27+
* topics/partitions so that records will be re-fetched on the next
28+
* poll. When used with a batch listener, the entire batch of records is
29+
* provided.
30+
*
31+
* @param <K> the key type.
32+
* @param <V> the value type.
33+
*
34+
* @author Gary Russell
35+
*
36+
* @since 1.3.5
37+
*
38+
*/
39+
@FunctionalInterface
40+
public interface AfterRollbackProcessor<K, V> {
41+
42+
/**
43+
* Process the remaining records.
44+
* @param records the records.
45+
* @param consumer the consumer.
46+
*/
47+
void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer);
48+
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.HashMap;
20+
import java.util.List;
21+
import java.util.Map;
22+
23+
import org.apache.commons.logging.Log;
24+
import org.apache.commons.logging.LogFactory;
25+
import org.apache.kafka.clients.consumer.Consumer;
26+
import org.apache.kafka.clients.consumer.ConsumerRecord;
27+
import org.apache.kafka.common.TopicPartition;
28+
29+
/**
30+
* Default implementation of {@link AfterRollbackProcessor}. Seeks all
31+
* topic/partitions so the records will be re-fetched, including the failed
32+
* record.
33+
*
34+
* @param <K> the key type.
35+
* @param <V> the value type.
36+
*
37+
* @author Gary Russell
38+
*
39+
* @since 1.3.5
40+
*
41+
*/
42+
public class DefaultAfterRollbackProcessor<K, V> implements AfterRollbackProcessor<K, V> {
43+
44+
private static final Log logger = LogFactory.getLog(DefaultAfterRollbackProcessor.class);
45+
46+
@Override
47+
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer) {
48+
Map<TopicPartition, Long> partitions = new HashMap<>();
49+
records.forEach(r -> partitions.computeIfAbsent(new TopicPartition(r.topic(), r.partition()),
50+
offset -> r.offset()));
51+
partitions.forEach((topicPartition, offset) -> {
52+
try {
53+
consumer.seek(topicPartition, offset);
54+
}
55+
catch (Exception e) {
56+
logger.error("Failed to seek " + topicPartition + " to " + offset);
57+
}
58+
});
59+
}
60+
61+
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+54-51
Original file line numberDiff line numberDiff line change
@@ -800,10 +800,7 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
800800
}
801801
catch (RuntimeException e) {
802802
this.logger.error("Transaction rolled back", e);
803-
Map<TopicPartition, Long> seekOffsets = new HashMap<>();
804-
records.forEach(r -> seekOffsets.computeIfAbsent(new TopicPartition(r.topic(), r.partition()),
805-
v -> r.offset()));
806-
seekOffsets.entrySet().forEach(entry -> this.consumer.seek(entry.getKey(), entry.getValue()));
803+
getAfterRollbackProcessor().process(recordList, this.consumer);
807804
}
808805
}
809806

@@ -849,7 +846,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
849846
}
850847
}
851848
catch (RuntimeException e) {
852-
if (this.containerProperties.isAckOnError() && !this.autoCommit) {
849+
if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) {
853850
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(recordList)) {
854851
this.acks.add(record);
855852
}
@@ -859,7 +856,11 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
859856
}
860857
try {
861858
this.batchErrorHandler.handle(e, records, this.consumer);
859+
// if the handler handled the error (no exception), go ahead and commit
862860
if (producer != null) {
861+
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(recordList)) {
862+
this.acks.add(record);
863+
}
863864
sendOffsetsToTransaction(producer);
864865
}
865866
}
@@ -919,8 +920,12 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
919920
}
920921
catch (RuntimeException e) {
921922
this.logger.error("Transaction rolled back", e);
922-
this.consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
923-
break;
923+
List<ConsumerRecord<K, V>> unprocessed = new ArrayList<>();
924+
unprocessed.add(record);
925+
while (iterator.hasNext()) {
926+
unprocessed.add(iterator.next());
927+
}
928+
getAfterRollbackProcessor().process(unprocessed, this.consumer);
924929
}
925930
}
926931
}
@@ -970,52 +975,20 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
970975
this.listener.onMessage(record);
971976
break;
972977
}
973-
if (this.isRecordAck) {
974-
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
975-
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
976-
new OffsetAndMetadata(record.offset() + 1));
977-
if (producer == null) {
978-
if (this.containerProperties.isSyncCommits()) {
979-
this.consumer.commitSync(offsetsToCommit);
980-
}
981-
else {
982-
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
983-
}
984-
}
985-
else {
986-
this.acks.add(record);
987-
}
988-
}
989-
else if (!this.isAnyManualAck && !this.autoCommit) {
990-
this.acks.add(record);
991-
}
992-
if (producer != null) {
993-
sendOffsetsToTransaction(producer);
994-
}
978+
ackCurrent(record, producer);
995979
}
996980
catch (RuntimeException e) {
997981
if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) {
998-
if (this.isRecordAck) {
999-
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
1000-
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
1001-
new OffsetAndMetadata(record.offset() + 1));
1002-
if (this.containerProperties.isSyncCommits()) {
1003-
this.consumer.commitSync(offsetsToCommit);
1004-
}
1005-
else {
1006-
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
1007-
}
1008-
}
1009-
else if (!this.isAnyManualAck) {
1010-
this.acks.add(record);
1011-
}
982+
ackCurrent(record, producer);
1012983
}
1013984
if (this.errorHandler == null) {
1014985
throw e;
1015986
}
1016987
try {
1017988
if (this.errorHandler instanceof RemainingRecordsErrorHandler) {
1018-
processCommits();
989+
if (producer == null) {
990+
processCommits();
991+
}
1019992
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
1020993
records.add(record);
1021994
while (iterator.hasNext()) {
@@ -1025,13 +998,13 @@ else if (!this.isAnyManualAck) {
1025998
}
1026999
else {
10271000
this.errorHandler.handle(e, record, this.consumer);
1028-
if (producer != null) {
1029-
try {
1030-
sendOffsetsToTransaction(producer);
1031-
}
1032-
catch (Exception e1) {
1033-
this.logger.error("Send offsets to transaction failed", e1);
1034-
}
1001+
}
1002+
if (producer != null) {
1003+
try {
1004+
sendOffsetsToTransaction(producer);
1005+
}
1006+
catch (Exception e1) {
1007+
this.logger.error("Send offsets to transaction failed", e1);
10351008
}
10361009
}
10371010
}
@@ -1047,6 +1020,36 @@ else if (!this.isAnyManualAck) {
10471020
return null;
10481021
}
10491022

1023+
public void ackCurrent(final ConsumerRecord<K, V> record, @SuppressWarnings("rawtypes") Producer producer) {
1024+
if (this.isRecordAck) {
1025+
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
1026+
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
1027+
new OffsetAndMetadata(record.offset() + 1));
1028+
if (producer == null) {
1029+
if (this.containerProperties.isSyncCommits()) {
1030+
this.consumer.commitSync(offsetsToCommit);
1031+
}
1032+
else {
1033+
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
1034+
}
1035+
}
1036+
else {
1037+
this.acks.add(record);
1038+
}
1039+
}
1040+
else if (!this.isAnyManualAck && !this.autoCommit) {
1041+
this.acks.add(record);
1042+
}
1043+
if (producer != null) {
1044+
try {
1045+
sendOffsetsToTransaction(producer);
1046+
}
1047+
catch (Exception e) {
1048+
this.logger.error("Send offsets to transaction failed", e);
1049+
}
1050+
}
1051+
}
1052+
10501053
@SuppressWarnings({ "unchecked", "rawtypes" })
10511054
private void sendOffsetsToTransaction(Producer producer) {
10521055
handleAcks();

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ public void testAckOnErrorRecord() throws Exception {
514514
}
515515
}
516516
assertThat(consumer.position(new TopicPartition(topic9, 0))).isEqualTo(1);
517-
// this consumer is positioned at 1, the next offset after the successfully
517+
// this consumer is positioned at 2, the next offset after the successfully
518518
// processed 'qux'
519519
// it has been updated even 'baz' failed
520520
for (int i = 0; i < 100; i++) {

0 commit comments

Comments
 (0)