Skip to content

Commit 83e3a45

Browse files
garyrussellartembilan
authored andcommitted
GH-656: Fix seek on rollback
Fixes #656 Fixes #657 Previously, after a rollback, we only performed a `seek` on the failed record. We need to seek for all unprocessed records. Also, when no error handler was provided, and using a batch listener, the offsets were added to `acks` and incorrectly committed. (#657). Also, if a `ContainerAwareErrorHandler` "handles" the error, the offsets weren't committed. Enhance the tests to verify full seeks. Add a new test to verify the batch listener doesn't commit after a roll back. **cherry-pick to 2.1.x, 2.0.x** I will backport to 1.3.x after review. * Some simple polishing
1 parent f79848b commit 83e3a45

File tree

9 files changed

+304
-63
lines changed

9 files changed

+304
-63
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

+18
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.kafka.core.ConsumerFactory;
2626
import org.springframework.kafka.core.KafkaTemplate;
2727
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
28+
import org.springframework.kafka.listener.AfterRollbackProcessor;
2829
import org.springframework.kafka.listener.BatchErrorHandler;
2930
import org.springframework.kafka.listener.ContainerProperties;
3031
import org.springframework.kafka.listener.ErrorHandler;
@@ -77,6 +78,9 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
7778
private ApplicationEventPublisher applicationEventPublisher;
7879

7980
private KafkaTemplate<K, V> replyTemplate;
81+
82+
private AfterRollbackProcessor<K, V> afterRollbackProcessor;
83+
8084
/**
8185
* Specify a {@link ConsumerFactory} to use.
8286
* @param consumerFactory The consumer factory.
@@ -212,6 +216,17 @@ public void setBatchErrorHandler(BatchErrorHandler errorHandler) {
212216
this.errorHandler = errorHandler;
213217
}
214218

219+
/**
220+
* Set a processor to invoke after a transaction rollback; typically will
221+
* seek the unprocessed topic/partition to reprocess the records.
222+
* The default does so, including the failed record.
223+
* @param afterRollbackProcessor the processor.
224+
* @since 1.3.5
225+
*/
226+
public void setAfterRollbackProcessor(AfterRollbackProcessor<K, V> afterRollbackProcessor) {
227+
this.afterRollbackProcessor = afterRollbackProcessor;
228+
}
229+
215230
/**
216231
* Obtain the properties template for this factory - set properties as needed
217232
* and they will be copied to a final properties instance for the endpoint.
@@ -289,6 +304,9 @@ protected void initializeContainer(C instance) {
289304
ContainerProperties properties = instance.getContainerProperties();
290305
BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern",
291306
"messageListener", "ackCount", "ackTime");
307+
if (this.afterRollbackProcessor != null) {
308+
instance.setAfterRollbackProcessor(this.afterRollbackProcessor);
309+
}
292310
if (this.containerProperties.getAckCount() > 0) {
293311
properties.setAckCount(this.containerProperties.getAckCount());
294312
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

+18
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ public abstract class AbstractMessageListenerContainer<K, V>
7171

7272
private int phase = DEFAULT_PHASE;
7373

74+
private AfterRollbackProcessor<K, V> afterRollbackProcessor = new DefaultAfterRollbackProcessor<>();
75+
7476
private volatile boolean running = false;
7577

7678
private volatile boolean paused;
@@ -210,6 +212,22 @@ public int getPhase() {
210212
return this.phase;
211213
}
212214

215+
protected AfterRollbackProcessor<K, V> getAfterRollbackProcessor() {
216+
return this.afterRollbackProcessor;
217+
}
218+
219+
/**
220+
* Set a processor to perform seeks on unprocessed records after a rollback.
221+
* Default will seek to current position all topics/partitions, including the failed
222+
* record.
223+
* @param afterRollbackProcessor the processor.
224+
* @since 1.3.5
225+
*/
226+
public void setAfterRollbackProcessor(AfterRollbackProcessor<K, V> afterRollbackProcessor) {
227+
Assert.notNull(afterRollbackProcessor, "'afterRollbackProcessor' cannot be null");
228+
this.afterRollbackProcessor = afterRollbackProcessor;
229+
}
230+
213231
@Override
214232
public ContainerProperties getContainerProperties() {
215233
return this.containerProperties;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.List;
20+
21+
import org.apache.kafka.clients.consumer.Consumer;
22+
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
24+
/**
25+
* Invoked by a listener container with remaining, unprocessed, records
26+
* (including the failed record). Implementations should seek the desired
27+
* topics/partitions so that records will be re-fetched on the next
28+
* poll. When used with a batch listener, the entire batch of records is
29+
* provided.
30+
*
31+
* @param <K> the key type.
32+
* @param <V> the value type.
33+
*
34+
* @author Gary Russell
35+
*
36+
* @since 1.3.5
37+
*
38+
*/
39+
@FunctionalInterface
40+
public interface AfterRollbackProcessor<K, V> {
41+
42+
/**
43+
* Process the remaining records.
44+
* @param records the records.
45+
* @param consumer the consumer.
46+
*/
47+
void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer);
48+
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.HashMap;
20+
import java.util.List;
21+
import java.util.Map;
22+
23+
import org.apache.commons.logging.Log;
24+
import org.apache.commons.logging.LogFactory;
25+
import org.apache.kafka.clients.consumer.Consumer;
26+
import org.apache.kafka.clients.consumer.ConsumerRecord;
27+
import org.apache.kafka.common.TopicPartition;
28+
29+
/**
30+
* Default implementation of {@link AfterRollbackProcessor}. Seeks all
31+
* topic/partitions so the records will be re-fetched, including the failed
32+
* record.
33+
*
34+
* @param <K> the key type.
35+
* @param <V> the value type.
36+
*
37+
* @author Gary Russell
38+
*
39+
* @since 1.3.5
40+
*
41+
*/
42+
public class DefaultAfterRollbackProcessor<K, V> implements AfterRollbackProcessor<K, V> {
43+
44+
private static final Log logger = LogFactory.getLog(DefaultAfterRollbackProcessor.class);
45+
46+
@Override
47+
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer) {
48+
Map<TopicPartition, Long> partitions = new HashMap<>();
49+
records.forEach(r -> partitions.computeIfAbsent(new TopicPartition(r.topic(), r.partition()),
50+
offset -> r.offset()));
51+
partitions.forEach((topicPartition, offset) -> {
52+
try {
53+
consumer.seek(topicPartition, offset);
54+
}
55+
catch (Exception e) {
56+
logger.error("Failed to seek " + topicPartition + " to " + offset);
57+
}
58+
});
59+
}
60+
61+
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+55-53
Original file line numberDiff line numberDiff line change
@@ -905,10 +905,7 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
905905
}
906906
catch (RuntimeException e) {
907907
this.logger.error("Transaction rolled back", e);
908-
Map<TopicPartition, Long> seekOffsets = new HashMap<>();
909-
records.forEach(r -> seekOffsets.computeIfAbsent(new TopicPartition(r.topic(), r.partition()),
910-
v -> r.offset()));
911-
seekOffsets.entrySet().forEach(entry -> this.consumer.seek(entry.getKey(), entry.getValue()));
908+
getAfterRollbackProcessor().process(recordList, this.consumer);
912909
}
913910
}
914911

@@ -954,7 +951,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
954951
}
955952
}
956953
catch (RuntimeException e) {
957-
if (this.containerProperties.isAckOnError() && !this.autoCommit) {
954+
if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) {
958955
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(recordList)) {
959956
this.acks.add(record);
960957
}
@@ -970,7 +967,11 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
970967
else {
971968
this.batchErrorHandler.handle(e, records, this.consumer);
972969
}
970+
// if the handler handled the error (no exception), go ahead and commit
973971
if (producer != null) {
972+
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(recordList)) {
973+
this.acks.add(record);
974+
}
974975
sendOffsetsToTransaction(producer);
975976
}
976977
}
@@ -1030,8 +1031,12 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
10301031
}
10311032
catch (RuntimeException e) {
10321033
this.logger.error("Transaction rolled back", e);
1033-
this.consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
1034-
break;
1034+
List<ConsumerRecord<K, V>> unprocessed = new ArrayList<>();
1035+
unprocessed.add(record);
1036+
while (iterator.hasNext()) {
1037+
unprocessed.add(iterator.next());
1038+
}
1039+
getAfterRollbackProcessor().process(unprocessed, this.consumer);
10351040
}
10361041
}
10371042
}
@@ -1081,54 +1086,20 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
10811086
this.listener.onMessage(record);
10821087
break;
10831088
}
1084-
if (this.isRecordAck) {
1085-
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
1086-
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
1087-
new OffsetAndMetadata(record.offset() + 1));
1088-
if (producer == null) {
1089-
this.commitLogger.log(() -> "Committing: " + offsetsToCommit);
1090-
if (this.containerProperties.isSyncCommits()) {
1091-
this.consumer.commitSync(offsetsToCommit);
1092-
}
1093-
else {
1094-
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
1095-
}
1096-
}
1097-
else {
1098-
this.acks.add(record);
1099-
}
1100-
}
1101-
else if (!this.isAnyManualAck && !this.autoCommit) {
1102-
this.acks.add(record);
1103-
}
1104-
if (producer != null) {
1105-
sendOffsetsToTransaction(producer);
1106-
}
1089+
ackCurrent(record, producer);
11071090
}
11081091
catch (RuntimeException e) {
11091092
if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) {
1110-
if (this.isRecordAck) {
1111-
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
1112-
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
1113-
new OffsetAndMetadata(record.offset() + 1));
1114-
this.commitLogger.log(() -> "Committing: " + offsetsToCommit);
1115-
if (this.containerProperties.isSyncCommits()) {
1116-
this.consumer.commitSync(offsetsToCommit);
1117-
}
1118-
else {
1119-
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
1120-
}
1121-
}
1122-
else if (!this.isAnyManualAck) {
1123-
this.acks.add(record);
1124-
}
1093+
ackCurrent(record, producer);
11251094
}
11261095
if (this.errorHandler == null) {
11271096
throw e;
11281097
}
11291098
try {
11301099
if (this.errorHandler instanceof ContainerAwareErrorHandler) {
1131-
processCommits();
1100+
if (producer == null) {
1101+
processCommits();
1102+
}
11321103
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
11331104
records.add(record);
11341105
while (iterator.hasNext()) {
@@ -1139,13 +1110,13 @@ else if (!this.isAnyManualAck) {
11391110
}
11401111
else {
11411112
this.errorHandler.handle(e, record, this.consumer);
1142-
if (producer != null) {
1143-
try {
1144-
sendOffsetsToTransaction(producer);
1145-
}
1146-
catch (Exception e1) {
1147-
this.logger.error("Send offsets to transaction failed", e1);
1148-
}
1113+
}
1114+
if (producer != null) {
1115+
try {
1116+
sendOffsetsToTransaction(producer);
1117+
}
1118+
catch (Exception e1) {
1119+
this.logger.error("Send offsets to transaction failed", e1);
11491120
}
11501121
}
11511122
}
@@ -1161,6 +1132,37 @@ else if (!this.isAnyManualAck) {
11611132
return null;
11621133
}
11631134

1135+
public void ackCurrent(final ConsumerRecord<K, V> record, @SuppressWarnings("rawtypes") Producer producer) {
1136+
if (this.isRecordAck) {
1137+
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
1138+
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
1139+
new OffsetAndMetadata(record.offset() + 1));
1140+
if (producer == null) {
1141+
this.commitLogger.log(() -> "Committing: " + offsetsToCommit);
1142+
if (this.containerProperties.isSyncCommits()) {
1143+
this.consumer.commitSync(offsetsToCommit);
1144+
}
1145+
else {
1146+
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
1147+
}
1148+
}
1149+
else {
1150+
this.acks.add(record);
1151+
}
1152+
}
1153+
else if (!this.isAnyManualAck && !this.autoCommit) {
1154+
this.acks.add(record);
1155+
}
1156+
if (producer != null) {
1157+
try {
1158+
sendOffsetsToTransaction(producer);
1159+
}
1160+
catch (Exception e) {
1161+
this.logger.error("Send offsets to transaction failed", e);
1162+
}
1163+
}
1164+
}
1165+
11641166
@SuppressWarnings({ "unchecked", "rawtypes" })
11651167
private void sendOffsetsToTransaction(Producer producer) {
11661168
handleAcks();

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ public void testAckOnErrorRecord() throws Exception {
514514
}
515515
}
516516
assertThat(consumer.position(new TopicPartition(topic9, 0))).isEqualTo(1);
517-
// this consumer is positioned at 1, the next offset after the successfully
517+
// this consumer is positioned at 2, the next offset after the successfully
518518
// processed 'qux'
519519
// it has been updated even 'baz' failed
520520
for (int i = 0; i < 100; i++) {

0 commit comments

Comments
 (0)