Skip to content

Commit ce9d7c6

Browse files
author
Zhiyang.Wang1
committed
Resolves #2588
GH-2588: ARBP support BatchListenerFailedException * support retry and recover in `DefaultAfterRollbackProcessor.processBatch` * add nextBackOff at `ListenerUtils` * fix assertThat `KafkaHeaders.DLT_ORIGINAL_OFFSET` at `TransactionalContainerTests.testMaxFailures`
1 parent ec7cde6 commit ce9d7c6

File tree

6 files changed

+103
-52
lines changed

6 files changed

+103
-52
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

+62-29
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232

3333
import org.springframework.kafka.core.KafkaOperations;
3434
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
35+
import org.springframework.kafka.support.KafkaUtils;
3536
import org.springframework.lang.Nullable;
3637
import org.springframework.util.Assert;
38+
import org.springframework.util.CollectionUtils;
3739
import org.springframework.util.backoff.BackOff;
3840
import org.springframework.util.backoff.BackOffExecution;
3941

@@ -63,7 +65,9 @@ public class DefaultAfterRollbackProcessor<K, V> extends FailedBatchProcessor
6365

6466
private final BackOff backOff;
6567

66-
private KafkaOperations<?, ?> kafkaTemplate;
68+
private final KafkaOperations<?, ?> kafkaTemplate;
69+
70+
private final BiConsumer<List<ConsumerRecord<?, ?>>, Exception> recoverer;
6771

6872
/**
6973
* Construct an instance with the default recoverer which simply logs the record after
@@ -140,20 +144,29 @@ public DefaultAfterRollbackProcessor(@Nullable
140144
public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer,
141145
BackOff backOff, @Nullable BackOffHandler backOffHandler, @Nullable KafkaOperations<?, ?> kafkaOperations,
142146
boolean commitRecovered) {
143-
144-
super(recoverer, backOff, backOffHandler, null);
147+
super(recoverer, backOff, backOffHandler, new CommonErrorHandler() { });
145148
this.kafkaTemplate = kafkaOperations;
146149
super.setCommitRecovered(commitRecovered);
147150
checkConfig();
148151
this.backOff = backOff;
152+
this.recoverer = (crs, ex) -> {
153+
if (!CollectionUtils.isEmpty(crs)) {
154+
if (recoverer == null) {
155+
this.logger.error(ex, () -> "Records discarded: " + recordsToString(crs));
156+
}
157+
else {
158+
crs.spliterator().forEachRemaining(rec -> recoverer.accept(rec, ex));
159+
}
160+
}
161+
};
149162
}
150163

151164
private void checkConfig() {
152165
Assert.isTrue(!isCommitRecovered() || this.kafkaTemplate != null,
153166
"A KafkaOperations is required when 'commitRecovered' is true");
154167
}
155168

156-
@SuppressWarnings({ "unchecked", "rawtypes", "deprecation" })
169+
@SuppressWarnings({ "unchecked", "rawtypes"})
157170
@Override
158171
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
159172
@Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {
@@ -179,42 +192,52 @@ && isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
179192

180193
}
181194

195+
@SuppressWarnings({ "unchecked", "rawtypes"})
182196
@Override
183197
public void processBatch(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList, Consumer<K, V> consumer,
184198
@Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {
185199

186-
int index = handlerBatchListenerFailedException(exception, records);
187-
if (index > -1) {
188-
List<ConsumerRecord<?, ?>> toCommit = new ArrayList<>();
189-
List<ConsumerRecord<?, ?>> remaining = new ArrayList<>();
190-
for (ConsumerRecord<?, ?> datum : records) {
191-
if (index-- > 0) {
192-
toCommit.add(datum);
193-
}
194-
else {
195-
remaining.add(datum);
196-
}
200+
long nextBackOff = ListenerUtils.nextBackOff(this.backOff, this.backOffs);
201+
if (nextBackOff != BackOffExecution.STOP) {
202+
SeekUtils.doSeeks((List) recordList, consumer, exception, recoverable,
203+
getFailureTracker()::recovered, container, this.logger);
204+
try {
205+
ListenerUtils.stoppableSleep(container, nextBackOff);
206+
}
207+
catch (InterruptedException e) {
208+
Thread.currentThread().interrupt();
209+
}
210+
return;
211+
}
212+
213+
int indexArg = handlerBatchListenerFailedException(exception, records);
214+
int index = indexArg > -1 ? indexArg + 1 : recordList.size();
215+
List<ConsumerRecord<?, ?>> toRecover = new ArrayList<>();
216+
List<ConsumerRecord<?, ?>> remaining = new ArrayList<>();
217+
for (ConsumerRecord<?, ?> datum : records) {
218+
if (index-- > 0) {
219+
toRecover.add(datum);
220+
}
221+
else {
222+
remaining.add(datum);
197223
}
224+
}
225+
226+
try {
227+
this.recoverer.accept(toRecover, exception);
228+
SeekUtils.doSeeks(remaining, consumer, exception, recoverable,
229+
getFailureTracker()::recovered, container, this.logger);
198230
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
199-
toCommit.forEach(rec -> offsets.compute(new TopicPartition(rec.topic(), rec.partition()),
231+
toRecover.forEach(rec -> offsets.compute(new TopicPartition(rec.topic(), rec.partition()),
200232
(key, val) -> ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1)));
201233
if (offsets.size() > 0 && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
202234
this.kafkaTemplate.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
203235
}
204-
205-
// skip first record
206-
if (SeekUtils.doSeeks(remaining, consumer, exception, recoverable,
207-
getFailureTracker()::recovered, container, this.logger)
208-
&& isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
209-
ConsumerRecord<?, ?> skipped = remaining.get(0);
210-
this.kafkaTemplate.sendOffsetsToTransaction(
211-
Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
212-
createOffsetAndMetadata(container, skipped.offset() + 1)
213-
), consumer.groupMetadata());
214-
}
236+
clearThreadState();
215237
}
216-
else {
217-
process(recordList, consumer, container, exception, false, eosMode);
238+
catch (Exception ex) {
239+
process(recordList, consumer, container, exception, recoverable, eosMode);
240+
logger.error(ex, () -> "Recoverer threw an exception; re-seeking batch");
218241
}
219242

220243
}
@@ -238,4 +261,14 @@ private static OffsetAndMetadata createOffsetAndMetadata(@Nullable MessageListen
238261
}
239262
return ListenerUtils.createOffsetAndMetadata(container, offset);
240263
}
264+
265+
private static String recordsToString(List<ConsumerRecord<?, ?>> records) {
266+
StringBuffer sb = new StringBuffer();
267+
records.spliterator().forEachRemaining(rec -> sb
268+
.append(KafkaUtils.format(rec))
269+
.append(','));
270+
sb.deleteCharAt(sb.length() - 1);
271+
return sb.toString();
272+
}
273+
241274
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
181181
.stream()
182182
.collect(
183183
Collectors.toMap(tp -> tp,
184-
tp -> data.records(tp).get(0).offset(), (u, v) -> (long) v, LinkedHashMap::new))
184+
tp -> data.records(tp).get(0).offset(), (u, v) -> v, LinkedHashMap::new))
185185
.forEach(consumer::seek);
186186

187187
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -2183,7 +2183,7 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
21832183
afterRollbackProcessorToUse.processBatch(records,
21842184
Objects.requireNonNullElseGet(recordList, () -> createRecordList(records)),
21852185
ListenerConsumer.this.consumer,
2186-
KafkaMessageListenerContainer.this.thisOrParentContainer, e, true,
2186+
KafkaMessageListenerContainer.this.thisOrParentContainer, e, false,
21872187
ListenerConsumer.this.eosMode);
21882188
}
21892189

@@ -2201,7 +2201,7 @@ private void batchAfterRollback(final ConsumerRecords<K, V> records,
22012201
try {
22022202
afterRollbackProcessorToUse.processBatch(records,
22032203
Objects.requireNonNullElseGet(recordList, () -> createRecordList(records)), this.consumer,
2204-
KafkaMessageListenerContainer.this.thisOrParentContainer, rollbackException, true,
2204+
KafkaMessageListenerContainer.this.thisOrParentContainer, rollbackException, false,
22052205
this.eosMode);
22062206
}
22072207
catch (KafkaException ke) {

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,7 @@ public static void unrecoverableBackOff(BackOff backOff, Map<Thread, BackOffExec
126126
Map<Thread, Long> lastIntervals, MessageListenerContainer container) throws InterruptedException {
127127

128128
Thread currentThread = Thread.currentThread();
129-
BackOffExecution backOffExecution = executions.get(currentThread);
130-
if (backOffExecution == null) {
131-
backOffExecution = backOff.start();
132-
executions.put(currentThread, backOffExecution);
133-
}
134-
Long interval = backOffExecution.nextBackOff();
129+
Long interval = nextBackOff(backOff, executions);
135130
if (interval == BackOffExecution.STOP) {
136131
interval = lastIntervals.get(currentThread);
137132
if (interval == null) {
@@ -144,6 +139,17 @@ public static void unrecoverableBackOff(BackOff backOff, Map<Thread, BackOffExec
144139
}
145140
}
146141

142+
public static long nextBackOff(BackOff backOff, Map<Thread, BackOffExecution> executions) {
143+
144+
Thread currentThread = Thread.currentThread();
145+
BackOffExecution backOffExecution = executions.get(currentThread);
146+
if (backOffExecution == null) {
147+
backOffExecution = backOff.start();
148+
executions.put(currentThread, backOffExecution);
149+
}
150+
return backOffExecution.nextBackOff();
151+
}
152+
147153
/**
148154
* Sleep for the desired timeout, as long as the container continues to run.
149155
* @param container the container.

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -130,14 +130,18 @@ void testBatchBackOff() {
130130
Consumer<String, String> consumer = mock(Consumer.class);
131131
given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo"));
132132
MessageListenerContainer container = mock(MessageListenerContainer.class);
133+
ContainerProperties containerProperties = mock(ContainerProperties.class);
134+
given(container.getContainerProperties()).willReturn(containerProperties);
133135
given(container.isRunning()).willReturn(true);
134-
processor.processBatch(consumerRecords, records, consumer, container, illegalState, true, EOSMode.V2);
135-
processor.processBatch(consumerRecords, records, consumer, container, illegalState, true, EOSMode.V2);
136+
processor.processBatch(consumerRecords, records, consumer, container, illegalState, false, EOSMode.V2);
137+
processor.processBatch(consumerRecords, records, consumer, container, illegalState, false, EOSMode.V2);
136138
verify(backOff, times(2)).start();
137139
verify(execution.get(), times(2)).nextBackOff();
138-
processor.clearThreadState();
139-
processor.processBatch(consumerRecords, records, consumer, container, illegalState, true, EOSMode.V2);
140+
processor.processBatch(consumerRecords, records, consumer, container, illegalState, false, EOSMode.V2);
140141
verify(backOff, times(3)).start();
142+
processor.clearThreadState();
143+
processor.processBatch(consumerRecords, records, consumer, container, illegalState, false, EOSMode.V2);
144+
verify(backOff, times(4)).start();
141145

142146
}
143147

@@ -173,10 +177,10 @@ void testBatchListenerFailedException() {
173177
given(container.getContainerProperties()).willReturn(containerProperties);
174178
given(container.isRunning()).willReturn(true);
175179
BatchListenerFailedException batchFailed = new BatchListenerFailedException("", 2);
176-
processor.processBatch(consumerRecords, records, consumer, container, batchFailed, true, EOSMode.V2);
180+
processor.processBatch(consumerRecords, records, consumer, container, batchFailed, false, EOSMode.V2);
177181
verify(consumer).seek(new TopicPartition("foo", 1), 1);
178182
BatchListenerFailedException batchFailed2 = new BatchListenerFailedException("", record1);
179-
processor.processBatch(consumerRecords, records, consumer, container, batchFailed2, true, EOSMode.V2);
183+
processor.processBatch(consumerRecords, records, consumer, container, batchFailed2, false, EOSMode.V2);
180184
verify(consumer).seek(new TopicPartition("foo", 0), 0);
181185
verify(consumer, times(2)).seek(new TopicPartition("foo", 1), 1);
182186
}

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

+16-8
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import static org.mockito.Mockito.verifyNoMoreInteractions;
3838

3939
import java.time.Duration;
40+
import java.util.ArrayList;
4041
import java.util.Arrays;
4142
import java.util.Collection;
4243
import java.util.Collections;
@@ -762,7 +763,7 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
762763
assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull();
763764
assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_STACKTRACE, byte[].class))
764765
.contains("fail for max failures".getBytes());
765-
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)[3]).isEqualTo((byte) 0);
766+
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)[7]).isEqualTo((byte) 0);
766767
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)[3]).isEqualTo((byte) 0);
767768
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP, byte[].class)).isNotNull();
768769
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE, byte[].class)).isNotNull();
@@ -862,11 +863,18 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
862863
container.stop();
863864
Consumer<Integer, String> consumer = cf.createConsumer();
864865
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic8DLT);
865-
ConsumerRecord<Integer, String> dltRecord = KafkaTestUtils.getSingleRecord(consumer, topic8DLT);
866-
assertThat(dltRecord.value()).isEqualTo("bar");
866+
ConsumerRecords<Integer, String> dltRecords = KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(60));
867+
List<ConsumerRecord<Integer, String>> recordList = new ArrayList<>();
868+
for (ConsumerRecord<Integer, String> record : dltRecords) {
869+
recordList.add(record);
870+
}
871+
ConsumerRecord<Integer, String> dltRecord0 = recordList.get(0);
872+
assertThat(dltRecord0.value()).isEqualTo("foo");
873+
ConsumerRecord<Integer, String> dltRecord1 = recordList.get(1);
874+
assertThat(dltRecord1.value()).isEqualTo("bar");
867875
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
868876
Map<String, Object> map = new HashMap<>();
869-
mapper.toHeaders(dltRecord.headers(), map);
877+
mapper.toHeaders(dltRecord1.headers(), map);
870878
MessageHeaders headers = new MessageHeaders(map);
871879
assertThat(new String(headers.get(KafkaHeaders.DLT_EXCEPTION_FQCN, byte[].class)))
872880
.contains("ListenerExecutionFailedException");
@@ -877,7 +885,7 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
877885
assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull();
878886
assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_STACKTRACE, byte[].class))
879887
.contains("fail for max failures".getBytes());
880-
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)[3]).isEqualTo((byte) 0);
888+
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)[7]).isEqualTo((byte) 1);
881889
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)[3]).isEqualTo((byte) 0);
882890
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP, byte[].class)).isNotNull();
883891
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE, byte[].class)).isNotNull();
@@ -892,10 +900,10 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
892900
assertThat(captor.getValue()).isInstanceOf(ListenerExecutionFailedException.class)
893901
.extracting(ex -> ((ListenerExecutionFailedException) ex).getGroupId())
894902
.isEqualTo(group);
895-
verify(afterRollbackProcessor).clearThreadState();
896-
verify(dlTemplate).send(any(ProducerRecord.class));
903+
verify(afterRollbackProcessor, times(2)).clearThreadState();
904+
verify(dlTemplate, times(2)).send(any(ProducerRecord.class));
897905
verify(dlTemplate).sendOffsetsToTransaction(
898-
eq(Collections.singletonMap(new TopicPartition(topic8, 0), new OffsetAndMetadata(1L))),
906+
eq(Collections.singletonMap(new TopicPartition(topic8, 0), new OffsetAndMetadata(2L))),
899907
any(ConsumerGroupMetadata.class));
900908
logger.info("Stop testBatchListenerMaxFailures");
901909
}

0 commit comments

Comments
 (0)