Skip to content

Commit 630dbb2

Browse files
authored
GH-2588: Batch recovery support in ARBP
* support batch recoverable `DefaultAfterRollbackProcessor` * add method processBatch at `AfterRollbackProcessor` * add opt-in property `batchRecoverAfterRollback` at `ContainerProperties` * change format to `BatchListenerFailedException.getMessage` * add batch recoverable after rollback unit test * review fix * `what-new.adoc` and `annotation-error-handling.adoc` * add javadoc in `SeekUtils` and `AfterRollbackProcessor` * change `ListenerUtils.nextBackOff` public to default * change logger args to static string * @author classes. * fix adoc * poblish `AfterRollbackProcessor` * javadoc at `ContainerProperties` * fix review and fix test bug at DefaultAfterRollbackProcessorTests * add @test to DefaultAfterRollbackProcessorTests.testNoEarlyExitBackOff * polish TransactionalContainerTests * fix bug Tests at DefaultAfterRollbackProcessorTests method `testNoEarlyExitBackOff` and testEarlyExitBackOff
1 parent 0c8e9b4 commit 630dbb2

File tree

11 files changed

+395
-58
lines changed

11 files changed

+395
-58
lines changed

Diff for: spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc

+4-1
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,10 @@ AfterRollbackProcessor<String, String> processor =
451451
When you do not use transactions, you can achieve similar functionality by configuring a `DefaultErrorHandler`.
452452
See xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Handlers].
453453

454-
IMPORTANT: Recovery is not possible with a batch listener, since the framework has no knowledge about which record in the batch keeps failing.
454+
Starting with version 3.2, Recovery can now recover (skip) entire batch of records that keeps failing.
455+
Set `ContainerProperties.setBatchRecoverAfterRollback(true)` to enable this feature.
456+
457+
IMPORTANT: Default behavior, recovery is not possible with a batch listener, since the framework has no knowledge about which record in the batch keeps failing.
455458
In such cases, the application listener must handle a record that keeps failing.
456459

457460
See also xref:kafka/annotation-error-handling.adoc#dead-letters[Publishing Dead-letter Records].

Diff for: spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

+7-1
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,10 @@ See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more inf
2626
It's now possible to redirect messages to the custom DLTs based on the type of the exception, which has been thrown during the message processing.
2727
Rules for the redirection are set either via the `RetryableTopic.exceptionBasedDltRouting` or the `RetryTopicConfigurationBuilder.dltRoutingRules`.
2828
Custom DLTs are created automatically as well as other retry and dead-letter topics.
29-
See xref:retrytopic/features.adoc#exc-based-custom-dlt-routing[Routing of messages to custom DLTs based on thrown exceptions] for more information.
29+
See xref:retrytopic/features.adoc#exc-based-custom-dlt-routing[Routing of messages to custom DLTs based on thrown exceptions] for more information.
30+
31+
[[x32-after-rollback-processing]]
32+
=== After Rollback Processing
33+
34+
A new `AfterRollbackProcessor` API `processBatch` is provided.
35+
See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Processor] for more information.

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java

+23-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020

2121
import org.apache.kafka.clients.consumer.Consumer;
2222
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
import org.apache.kafka.clients.consumer.ConsumerRecords;
2324

2425
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
2526

@@ -34,6 +35,7 @@
3435
* @param <V> the value type.
3536
*
3637
* @author Gary Russell
38+
* @author Wang Zhiyang
3739
*
3840
* @since 1.3.5
3941
*
@@ -63,6 +65,26 @@ public interface AfterRollbackProcessor<K, V> {
6365
void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
6466
MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode);
6567

68+
/**
69+
* Process the entire batch of records.
70+
* Recoverable will be true if the container is processing entire batch of records;
71+
* @param records the records.
72+
* @param recordList the record list.
73+
* @param consumer the consumer.
74+
* @param container the container.
75+
* @param exception the exception
76+
* @param recoverable the recoverable.
77+
* @param eosMode the {@link EOSMode}.
78+
* @since 3.2
79+
* @see #isProcessInTransaction()
80+
*/
81+
default void processBatch(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList,
82+
Consumer<K, V> consumer, MessageListenerContainer container, Exception exception,
83+
boolean recoverable, ContainerProperties.EOSMode eosMode) {
84+
85+
process(recordList, consumer, container, exception, recoverable, eosMode);
86+
}
87+
6688
/**
6789
* Optional method to clear thread state; will be called just before a consumer
6890
* thread terminates.

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/BatchListenerFailedException.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
* failed.
2727
*
2828
* @author Gary Russell
29+
* @author Wang Zhiyang
2930
* @since 2.5
3031
*
3132
*/
@@ -98,9 +99,9 @@ public int getIndex() {
9899

99100
@Override
100101
public String getMessage() {
101-
return super.getMessage() + (this.record != null
102+
return super.getMessage() + " " + (this.record != null
102103
? (this.record.topic() + "-" + this.record.partition() + "@" + this.record.offset())
103-
: (" @-" + this.index));
104+
: ("@-" + this.index));
104105
}
105106

106107
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

+21
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
* @author Johnny Lim
5252
* @author Lukasz Kaminski
5353
* @author Kyuhyeok Park
54+
* @author Wang Zhiyang
5455
*/
5556
public class ContainerProperties extends ConsumerProperties {
5657

@@ -258,6 +259,8 @@ public enum EOSMode {
258259

259260
private PlatformTransactionManager transactionManager;
260261

262+
private boolean batchRecoverAfterRollback = false;
263+
261264
private int monitorInterval = DEFAULT_MONITOR_INTERVAL;
262265

263266
private TaskScheduler scheduler;
@@ -543,6 +546,24 @@ public void setTransactionManager(@Nullable PlatformTransactionManager transacti
543546
this.transactionManager = transactionManager;
544547
}
545548

549+
/**
550+
* Recover batch records after rollback if true.
551+
* @return true to recover.
552+
* @since 3.2
553+
*/
554+
public boolean isBatchRecoverAfterRollback() {
555+
return this.batchRecoverAfterRollback;
556+
}
557+
558+
/**
559+
* enable the batch recover after rollback.
560+
* @param batchRecoverAfterRollback the batchRecoverAfterRollback to set.
561+
* @since 3.2
562+
*/
563+
public void setBatchRecoverAfterRollback(boolean batchRecoverAfterRollback) {
564+
this.batchRecoverAfterRollback = batchRecoverAfterRollback;
565+
}
566+
546567
public int getMonitorInterval() {
547568
return this.monitorInterval;
548569
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

+60-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,16 +17,19 @@
1717
package org.springframework.kafka.listener;
1818

1919
import java.util.Collections;
20+
import java.util.HashMap;
2021
import java.util.List;
2122
import java.util.Map;
2223
import java.util.concurrent.ConcurrentHashMap;
2324
import java.util.function.BiConsumer;
2425

2526
import org.apache.kafka.clients.consumer.Consumer;
2627
import org.apache.kafka.clients.consumer.ConsumerRecord;
28+
import org.apache.kafka.clients.consumer.ConsumerRecords;
2729
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2830
import org.apache.kafka.common.TopicPartition;
2931

32+
import org.springframework.kafka.KafkaException;
3033
import org.springframework.kafka.core.KafkaOperations;
3134
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
3235
import org.springframework.lang.Nullable;
@@ -47,6 +50,7 @@
4750
*
4851
* @author Gary Russell
4952
* @author Francois Rosiere
53+
* @author Wang Zhiyang
5054
*
5155
* @since 1.3.5
5256
*
@@ -60,7 +64,9 @@ public class DefaultAfterRollbackProcessor<K, V> extends FailedRecordProcessor
6064

6165
private final BackOff backOff;
6266

63-
private KafkaOperations<?, ?> kafkaTemplate;
67+
private final KafkaOperations<?, ?> kafkaTemplate;
68+
69+
private final BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer;
6470

6571
/**
6672
* Construct an instance with the default recoverer which simply logs the record after
@@ -143,6 +149,11 @@ public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>,
143149
super.setCommitRecovered(commitRecovered);
144150
checkConfig();
145151
this.backOff = backOff;
152+
this.recoverer = (crs, ex) -> {
153+
if (recoverer != null && !crs.isEmpty()) {
154+
crs.spliterator().forEachRemaining(rec -> recoverer.accept(rec, ex));
155+
}
156+
};
146157
}
147158

148159
private void checkConfig() {
@@ -176,6 +187,53 @@ && isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
176187

177188
}
178189

190+
@SuppressWarnings({ "unchecked", "rawtypes"})
191+
@Override
192+
public void processBatch(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList, Consumer<K, V> consumer,
193+
@Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {
194+
195+
if (recoverable && isCommitRecovered()) {
196+
long nextBackOff = ListenerUtils.nextBackOff(this.backOff, this.backOffs);
197+
if (nextBackOff != BackOffExecution.STOP) {
198+
SeekUtils.doSeeksToBegin((List) recordList, consumer, this.logger);
199+
try {
200+
ListenerUtils.stoppableSleep(container, nextBackOff);
201+
}
202+
catch (InterruptedException e) {
203+
Thread.currentThread().interrupt();
204+
}
205+
return;
206+
}
207+
208+
try {
209+
this.recoverer.accept(records, exception);
210+
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
211+
records.forEach(rec -> offsets.put(new TopicPartition(rec.topic(), rec.partition()),
212+
ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1)));
213+
if (offsets.size() > 0 && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
214+
this.kafkaTemplate.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
215+
}
216+
clearThreadState();
217+
}
218+
catch (Exception ex) {
219+
SeekUtils.doSeeksToBegin((List) recordList, consumer, this.logger);
220+
logger.error(ex, "Recoverer threw an exception; re-seeking batch");
221+
throw ex;
222+
}
223+
return;
224+
}
225+
226+
try {
227+
process(recordList, consumer, container, exception, false, eosMode);
228+
}
229+
catch (KafkaException ke) {
230+
ke.selfLog("AfterRollbackProcessor threw an exception", this.logger);
231+
}
232+
catch (Exception ex) {
233+
this.logger.error(ex, "AfterRollbackProcessor threw an exception");
234+
}
235+
}
236+
179237
@Override
180238
public boolean isProcessInTransaction() {
181239
return isCommitRecovered();

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+16-24
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.List;
3333
import java.util.Map;
3434
import java.util.Map.Entry;
35+
import java.util.Objects;
3536
import java.util.Properties;
3637
import java.util.Set;
3738
import java.util.concurrent.BlockingQueue;
@@ -662,6 +663,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
662663

663664
private final boolean wantsFullRecords;
664665

666+
private final boolean wantsBatchRecoverAfterRollback;
667+
665668
private final boolean asyncReplies;
666669

667670
private final boolean autoCommit;
@@ -888,6 +891,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
888891

889892
this.clientId = determineClientId();
890893
this.transactionTemplate = determineTransactionTemplate();
894+
this.wantsBatchRecoverAfterRollback = this.containerProperties.isBatchRecoverAfterRollback();
891895
this.genericListener = listener;
892896
this.consumerSeekAwareListener = checkConsumerSeekAware(listener);
893897
this.commitCurrentOnAssignment = determineCommitCurrent(consumerProperties,
@@ -2195,38 +2199,26 @@ private void batchRollback(final ConsumerRecords<K, V> records,
21952199

21962200
@Override
21972201
protected void doInTransactionWithoutResult(TransactionStatus status) {
2198-
batchAfterRollback(records, recordList, e, afterRollbackProcessorToUse);
2202+
afterRollbackProcessorToUse.processBatch(records,
2203+
Objects.requireNonNullElseGet(recordList, () -> createRecordList(records)),
2204+
ListenerConsumer.this.consumer,
2205+
KafkaMessageListenerContainer.this.thisOrParentContainer, e,
2206+
ListenerConsumer.this.wantsBatchRecoverAfterRollback, ListenerConsumer.this.eosMode);
21992207
}
22002208

22012209
});
22022210
}
22032211
else {
2204-
batchAfterRollback(records, recordList, e, afterRollbackProcessorToUse);
2205-
}
2206-
}
2207-
2208-
private void batchAfterRollback(final ConsumerRecords<K, V> records,
2209-
@Nullable final List<ConsumerRecord<K, V>> recordList, RuntimeException rollbackException,
2210-
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse) {
2211-
2212-
try {
2213-
if (recordList == null) {
2214-
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer,
2215-
KafkaMessageListenerContainer.this.thisOrParentContainer, rollbackException, false,
2216-
this.eosMode);
2212+
try {
2213+
afterRollbackProcessorToUse.processBatch(records,
2214+
Objects.requireNonNullElseGet(recordList, () -> createRecordList(records)), this.consumer,
2215+
KafkaMessageListenerContainer.this.thisOrParentContainer, e,
2216+
this.wantsBatchRecoverAfterRollback, this.eosMode);
22172217
}
2218-
else {
2219-
afterRollbackProcessorToUse.process(recordList, this.consumer,
2220-
KafkaMessageListenerContainer.this.thisOrParentContainer, rollbackException, false,
2221-
this.eosMode);
2218+
catch (Exception ex) {
2219+
this.logger.error(ex, "AfterRollbackProcessor threw exception");
22222220
}
22232221
}
2224-
catch (KafkaException ke) {
2225-
ke.selfLog("AfterRollbackProcessor threw an exception", this.logger);
2226-
}
2227-
catch (Exception ex) {
2228-
this.logger.error(ex, "AfterRollbackProcessor threw an exception");
2229-
}
22302222
}
22312223

22322224
private List<ConsumerRecord<K, V>> createRecordList(final ConsumerRecords<K, V> records) {

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

+14-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2023 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@
3131
* @author Gary Russell
3232
* @author Francois Rosiere
3333
* @author Antonio Tomac
34+
* @author Wang Zhiyang
3435
* @since 2.0
3536
*
3637
*/
@@ -126,12 +127,7 @@ public static void unrecoverableBackOff(BackOff backOff, Map<Thread, BackOffExec
126127
Map<Thread, Long> lastIntervals, MessageListenerContainer container) throws InterruptedException {
127128

128129
Thread currentThread = Thread.currentThread();
129-
BackOffExecution backOffExecution = executions.get(currentThread);
130-
if (backOffExecution == null) {
131-
backOffExecution = backOff.start();
132-
executions.put(currentThread, backOffExecution);
133-
}
134-
Long interval = backOffExecution.nextBackOff();
130+
Long interval = nextBackOff(backOff, executions);
135131
if (interval == BackOffExecution.STOP) {
136132
interval = lastIntervals.get(currentThread);
137133
if (interval == null) {
@@ -144,6 +140,17 @@ public static void unrecoverableBackOff(BackOff backOff, Map<Thread, BackOffExec
144140
}
145141
}
146142

143+
static long nextBackOff(BackOff backOff, Map<Thread, BackOffExecution> executions) {
144+
145+
Thread currentThread = Thread.currentThread();
146+
BackOffExecution backOffExecution = executions.get(currentThread);
147+
if (backOffExecution == null) {
148+
backOffExecution = backOff.start();
149+
executions.put(currentThread, backOffExecution);
150+
}
151+
return backOffExecution.nextBackOff();
152+
}
153+
147154
/**
148155
* Sleep for the desired timeout, as long as the container continues to run.
149156
* @param container the container.

0 commit comments

Comments
 (0)