Skip to content

Commit d7b2f5b

Browse files
Zhiyang.Wang1Wzy19930507
Zhiyang.Wang1
authored andcommitted
GH-2588: support batch recoverable DefaultAfterRollbackProcessor
* add method processBatch at `AfterRollbackProcessor` * add opt-in property `batchRecoverAfterRollback` at `ContainerProperties` * change format to `BatchListenerFailedException.getMessage` * add batch recoverable after rollback unit test
1 parent 3333242 commit d7b2f5b

9 files changed

+337
-65
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020

2121
import org.apache.kafka.clients.consumer.Consumer;
2222
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
import org.apache.kafka.clients.consumer.ConsumerRecords;
2324

2425
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
2526

@@ -63,6 +64,12 @@ public interface AfterRollbackProcessor<K, V> {
6364
void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
6465
MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode);
6566

67+
default void processBatch(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList,
68+
Consumer<K, V> consumer, MessageListenerContainer container, Exception exception,
69+
boolean recoverable, ContainerProperties.EOSMode eosMode) {
70+
process(recordList, consumer, container, exception, recoverable, eosMode);
71+
}
72+
6673
/**
6774
* Optional method to clear thread state; will be called just before a consumer
6875
* thread terminates.

spring-kafka/src/main/java/org/springframework/kafka/listener/BatchListenerFailedException.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ public int getIndex() {
9898

9999
@Override
100100
public String getMessage() {
101-
return super.getMessage() + (this.record != null
101+
return super.getMessage() + " " + (this.record != null
102102
? (this.record.topic() + "-" + this.record.partition() + "@" + this.record.offset())
103-
: (" @-" + this.index));
103+
: ("@-" + this.index));
104104
}
105105

106106
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

+10
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,8 @@ public enum EOSMode {
258258

259259
private PlatformTransactionManager transactionManager;
260260

261+
private boolean batchRecoverAfterRollback = false;
262+
261263
private int monitorInterval = DEFAULT_MONITOR_INTERVAL;
262264

263265
private TaskScheduler scheduler;
@@ -543,6 +545,14 @@ public void setTransactionManager(@Nullable PlatformTransactionManager transacti
543545
this.transactionManager = transactionManager;
544546
}
545547

548+
public boolean isBatchRecoverAfterRollback() {
549+
return this.batchRecoverAfterRollback;
550+
}
551+
552+
public void setBatchRecoverAfterRollback(boolean batchRecoverAfterRollback) {
553+
this.batchRecoverAfterRollback = batchRecoverAfterRollback;
554+
}
555+
546556
public int getMonitorInterval() {
547557
return this.monitorInterval;
548558
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

+58-1
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,19 @@
1717
package org.springframework.kafka.listener;
1818

1919
import java.util.Collections;
20+
import java.util.HashMap;
2021
import java.util.List;
2122
import java.util.Map;
2223
import java.util.concurrent.ConcurrentHashMap;
2324
import java.util.function.BiConsumer;
2425

2526
import org.apache.kafka.clients.consumer.Consumer;
2627
import org.apache.kafka.clients.consumer.ConsumerRecord;
28+
import org.apache.kafka.clients.consumer.ConsumerRecords;
2729
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2830
import org.apache.kafka.common.TopicPartition;
2931

32+
import org.springframework.kafka.KafkaException;
3033
import org.springframework.kafka.core.KafkaOperations;
3134
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
3235
import org.springframework.lang.Nullable;
@@ -60,7 +63,9 @@ public class DefaultAfterRollbackProcessor<K, V> extends FailedRecordProcessor
6063

6164
private final BackOff backOff;
6265

63-
private KafkaOperations<?, ?> kafkaTemplate;
66+
private final KafkaOperations<?, ?> kafkaTemplate;
67+
68+
private final BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer;
6469

6570
/**
6671
* Construct an instance with the default recoverer which simply logs the record after
@@ -143,6 +148,11 @@ public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>,
143148
super.setCommitRecovered(commitRecovered);
144149
checkConfig();
145150
this.backOff = backOff;
151+
this.recoverer = (crs, ex) -> {
152+
if (recoverer != null && !crs.isEmpty()) {
153+
crs.spliterator().forEachRemaining(rec -> recoverer.accept(rec, ex));
154+
}
155+
};
146156
}
147157

148158
private void checkConfig() {
@@ -176,6 +186,53 @@ && isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
176186

177187
}
178188

189+
@SuppressWarnings({ "unchecked", "rawtypes"})
190+
@Override
191+
public void processBatch(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList, Consumer<K, V> consumer,
192+
@Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {
193+
194+
if (recoverable && isCommitRecovered()) {
195+
long nextBackOff = ListenerUtils.nextBackOff(this.backOff, this.backOffs);
196+
if (nextBackOff != BackOffExecution.STOP) {
197+
SeekUtils.doSeeksToBegin((List) recordList, consumer, this.logger);
198+
try {
199+
ListenerUtils.stoppableSleep(container, nextBackOff);
200+
}
201+
catch (InterruptedException e) {
202+
Thread.currentThread().interrupt();
203+
}
204+
return;
205+
}
206+
207+
try {
208+
this.recoverer.accept(records, exception);
209+
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
210+
records.forEach(rec -> offsets.put(new TopicPartition(rec.topic(), rec.partition()),
211+
ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1)));
212+
if (offsets.size() > 0 && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
213+
this.kafkaTemplate.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
214+
}
215+
clearThreadState();
216+
}
217+
catch (Exception ex) {
218+
SeekUtils.doSeeksToBegin((List) recordList, consumer, this.logger);
219+
logger.error(ex, () -> "Recoverer threw an exception; re-seeking batch");
220+
throw ex;
221+
}
222+
return;
223+
}
224+
225+
try {
226+
process(recordList, consumer, container, exception, false, eosMode);
227+
}
228+
catch (KafkaException ke) {
229+
ke.selfLog("AfterRollbackProcessor threw an exception", this.logger);
230+
}
231+
catch (Exception ex) {
232+
this.logger.error(ex, "AfterRollbackProcessor threw an exception");
233+
}
234+
}
235+
179236
@Override
180237
public boolean isProcessInTransaction() {
181238
return isCommitRecovered();

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+13-26
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.List;
3333
import java.util.Map;
3434
import java.util.Map.Entry;
35+
import java.util.Objects;
3536
import java.util.Properties;
3637
import java.util.Set;
3738
import java.util.concurrent.BlockingQueue;
@@ -660,6 +661,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
660661

661662
private final boolean wantsFullRecords;
662663

664+
private final boolean wantsBatchRecoverAfterRollback;
665+
663666
private final boolean autoCommit;
664667

665668
private final boolean isManualAck;
@@ -882,6 +885,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
882885

883886
this.clientId = determineClientId();
884887
this.transactionTemplate = determineTransactionTemplate();
888+
this.wantsBatchRecoverAfterRollback = this.containerProperties.isBatchRecoverAfterRollback();
885889
this.genericListener = listener;
886890
this.consumerSeekAwareListener = checkConsumerSeekAware(listener);
887891
this.commitCurrentOnAssignment = determineCommitCurrent(consumerProperties,
@@ -2190,37 +2194,20 @@ private void batchRollback(final ConsumerRecords<K, V> records,
21902194

21912195
@Override
21922196
protected void doInTransactionWithoutResult(TransactionStatus status) {
2193-
batchAfterRollback(records, recordList, e, afterRollbackProcessorToUse);
2197+
afterRollbackProcessorToUse.processBatch(records,
2198+
Objects.requireNonNullElseGet(recordList, () -> createRecordList(records)),
2199+
ListenerConsumer.this.consumer,
2200+
KafkaMessageListenerContainer.this.thisOrParentContainer, e,
2201+
ListenerConsumer.this.wantsBatchRecoverAfterRollback, ListenerConsumer.this.eosMode);
21942202
}
21952203

21962204
});
21972205
}
21982206
else {
2199-
batchAfterRollback(records, recordList, e, afterRollbackProcessorToUse);
2200-
}
2201-
}
2202-
2203-
private void batchAfterRollback(final ConsumerRecords<K, V> records,
2204-
@Nullable final List<ConsumerRecord<K, V>> recordList, RuntimeException rollbackException,
2205-
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse) {
2206-
2207-
try {
2208-
if (recordList == null) {
2209-
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer,
2210-
KafkaMessageListenerContainer.this.thisOrParentContainer, rollbackException, false,
2211-
this.eosMode);
2212-
}
2213-
else {
2214-
afterRollbackProcessorToUse.process(recordList, this.consumer,
2215-
KafkaMessageListenerContainer.this.thisOrParentContainer, rollbackException, false,
2216-
this.eosMode);
2217-
}
2218-
}
2219-
catch (KafkaException ke) {
2220-
ke.selfLog("AfterRollbackProcessor threw an exception", this.logger);
2221-
}
2222-
catch (Exception ex) {
2223-
this.logger.error(ex, "AfterRollbackProcessor threw an exception");
2207+
afterRollbackProcessorToUse.processBatch(records,
2208+
Objects.requireNonNullElseGet(recordList, () -> createRecordList(records)), this.consumer,
2209+
KafkaMessageListenerContainer.this.thisOrParentContainer, e,
2210+
this.wantsBatchRecoverAfterRollback, this.eosMode);
22242211
}
22252212
}
22262213

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,7 @@ public static void unrecoverableBackOff(BackOff backOff, Map<Thread, BackOffExec
126126
Map<Thread, Long> lastIntervals, MessageListenerContainer container) throws InterruptedException {
127127

128128
Thread currentThread = Thread.currentThread();
129-
BackOffExecution backOffExecution = executions.get(currentThread);
130-
if (backOffExecution == null) {
131-
backOffExecution = backOff.start();
132-
executions.put(currentThread, backOffExecution);
133-
}
134-
Long interval = backOffExecution.nextBackOff();
129+
Long interval = nextBackOff(backOff, executions);
135130
if (interval == BackOffExecution.STOP) {
136131
interval = lastIntervals.get(currentThread);
137132
if (interval == null) {
@@ -144,6 +139,17 @@ public static void unrecoverableBackOff(BackOff backOff, Map<Thread, BackOffExec
144139
}
145140
}
146141

142+
public static long nextBackOff(BackOff backOff, Map<Thread, BackOffExecution> executions) {
143+
144+
Thread currentThread = Thread.currentThread();
145+
BackOffExecution backOffExecution = executions.get(currentThread);
146+
if (backOffExecution == null) {
147+
backOffExecution = backOff.start();
148+
executions.put(currentThread, backOffExecution);
149+
}
150+
return backOffExecution.nextBackOff();
151+
}
152+
147153
/**
148154
* Sleep for the desired timeout, as long as the container continues to run.
149155
* @param container the container.

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java

+11
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,17 @@ public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?>
133133
return skipped.get();
134134
}
135135

136+
public static void doSeeksToBegin(List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
137+
LogAccessor logger) {
138+
139+
Map<TopicPartition, Long> partitions = new LinkedHashMap<>();
140+
records.forEach(record -> {
141+
partitions.computeIfAbsent(new TopicPartition(record.topic(), record.partition()),
142+
offset -> record.offset());
143+
});
144+
seekPartitions(consumer, partitions, logger);
145+
}
146+
136147
/**
137148
* Perform seek operations on each partition.
138149
* @param consumer the consumer.

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java

+14-22
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2022 the original author or authors.
2+
* Copyright 2019-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,14 +27,18 @@
2727
import static org.mockito.Mockito.times;
2828
import static org.mockito.Mockito.verify;
2929

30+
import java.util.ArrayList;
3031
import java.util.Arrays;
32+
import java.util.HashMap;
3133
import java.util.List;
34+
import java.util.Map;
3235
import java.util.concurrent.atomic.AtomicBoolean;
3336
import java.util.concurrent.atomic.AtomicReference;
3437

3538
import org.apache.kafka.clients.consumer.Consumer;
3639
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
3740
import org.apache.kafka.clients.consumer.ConsumerRecord;
41+
import org.apache.kafka.clients.consumer.ConsumerRecords;
3842
import org.apache.kafka.common.TopicPartition;
3943
import org.junit.jupiter.api.Test;
4044
import org.mockito.InOrder;
@@ -100,8 +104,8 @@ void testClassifier() {
100104
}
101105

102106
@Test
103-
void testBatchBackOff() {
104-
AtomicReference<ConsumerRecord<?, ?>> recovered = new AtomicReference<>();
107+
void testBackOffNoBatchRecover() {
108+
105109
@SuppressWarnings("unchecked")
106110
KafkaOperations<String, String> template = mock(KafkaOperations.class);
107111
given(template.isTransactional()).willReturn(true);
@@ -118,37 +122,25 @@ void testBatchBackOff() {
118122
ConsumerRecord<String, String> record1 = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar");
119123
ConsumerRecord<String, String> record2 = new ConsumerRecord<>("foo", 1, 1L, "foo", "bar");
120124
List<ConsumerRecord<String, String>> records = Arrays.asList(record1, record2);
125+
Map<TopicPartition, List<ConsumerRecord<String, String>>> map = new HashMap<>();
126+
records.forEach(rec -> map.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
127+
tp -> new ArrayList<>()).add(rec));
128+
ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(map);
121129
IllegalStateException illegalState = new IllegalStateException();
122130
@SuppressWarnings("unchecked")
123131
Consumer<String, String> consumer = mock(Consumer.class);
124132
given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo"));
125133
MessageListenerContainer container = mock(MessageListenerContainer.class);
126134
given(container.isRunning()).willReturn(true);
127-
processor.process(records, consumer, container, illegalState, false, EOSMode.V2);
128-
processor.process(records, consumer, container, illegalState, false, EOSMode.V2);
135+
processor.processBatch(consumerRecords, records, consumer, container, illegalState, false, EOSMode.V2);
136+
processor.processBatch(consumerRecords, records, consumer, container, illegalState, false, EOSMode.V2);
129137
verify(backOff, times(2)).start();
130138
verify(execution.get(), times(2)).nextBackOff();
131139
processor.clearThreadState();
132-
processor.process(records, consumer, container, illegalState, false, EOSMode.V2);
140+
processor.processBatch(consumerRecords, records, consumer, container, illegalState, false, EOSMode.V2);
133141
verify(backOff, times(3)).start();
134142
}
135143

136-
void testEarlyExitBackOff() {
137-
DefaultAfterRollbackProcessor<String, String> processor = new DefaultAfterRollbackProcessor<>(
138-
new FixedBackOff(1, 10_000));
139-
@SuppressWarnings("unchecked")
140-
Consumer<String, String> consumer = mock(Consumer.class);
141-
ConsumerRecord<String, String> record1 = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar");
142-
ConsumerRecord<String, String> record2 = new ConsumerRecord<>("foo", 1, 1L, "foo", "bar");
143-
List<ConsumerRecord<String, String>> records = Arrays.asList(record1, record2);
144-
IllegalStateException illegalState = new IllegalStateException();
145-
MessageListenerContainer container = mock(MessageListenerContainer.class);
146-
given(container.isRunning()).willReturn(false);
147-
long t1 = System.currentTimeMillis();
148-
processor.process(records, consumer, container, illegalState, true, EOSMode.V2);
149-
assertThat(System.currentTimeMillis() < t1 + 5_000);
150-
}
151-
152144
@Test
153145
void testNoEarlyExitBackOff() {
154146
DefaultAfterRollbackProcessor<String, String> processor = new DefaultAfterRollbackProcessor<>(

0 commit comments

Comments
 (0)