Skip to content

Commit 6978f2d

Browse files
committed
review fix
* `what-new.adoc` and `annotation-error-handling.adoc` * add javadoc in `SeekUtils` and `AfterRollbackProcessor` * change `ListenerUtils.nextBackOff` public to default
1 parent d7b2f5b commit 6978f2d

File tree

5 files changed

+33
-5
lines changed

5 files changed

+33
-5
lines changed

Diff for: spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc

+5-1
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,11 @@ AfterRollbackProcessor<String, String> processor =
451451
When you do not use transactions, you can achieve similar functionality by configuring a `DefaultErrorHandler`.
452452
See xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Handlers].
453453

454-
IMPORTANT: Recovery is not possible with a batch listener, since the framework has no knowledge about which record in the batch keeps failing.
454+
Starting with version 3.2, Recovery can now recover (skip) entire batch of records that keeps failing.
455+
Set `ContainerProperties.setBatchRecoverAfterRollback(true)` to enable this feature.
456+
457+
IMPORTANT: Default behavior, recovery is not possible with a batch listener,
458+
since the framework has no knowledge about which record in the batch keeps failing.
455459
In such cases, the application listener must handle a record that keeps failing.
456460

457461
See also xref:kafka/annotation-error-handling.adoc#dead-letters[Publishing Dead-letter Records].

Diff for: spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

+5
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,8 @@
77
This section covers the changes made from version 3.1 to version 3.2.
88
For changes in earlier version, see xref:appendix/change-history.adoc[Change History].
99

10+
[[x32-after-rollback-processing]]
11+
=== After Rollback Processing
12+
13+
A new `AfterRollbackProcessor` API `processBatch` is provided.
14+
See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Processor] for more information.

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -64,6 +64,19 @@ public interface AfterRollbackProcessor<K, V> {
6464
void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
6565
MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode);
6666

67+
/**
68+
* Process the entire batch of records.
69+
* Recoverable will be true if the container is processing entire batch of records;
70+
* @param records the records.
71+
* @param recordList the record list.
72+
* @param consumer the consumer.
73+
* @param container the container.
74+
* @param exception the exception
75+
* @param recoverable the recoverable.
76+
* @param eosMode the {@link EOSMode}.
77+
* @since 3.2
78+
* @see #isProcessInTransaction()
79+
*/
6780
default void processBatch(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList,
6881
Consumer<K, V> consumer, MessageListenerContainer container, Exception exception,
6982
boolean recoverable, ContainerProperties.EOSMode eosMode) {

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2023 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -139,7 +139,7 @@ public static void unrecoverableBackOff(BackOff backOff, Map<Thread, BackOffExec
139139
}
140140
}
141141

142-
public static long nextBackOff(BackOff backOff, Map<Thread, BackOffExecution> executions) {
142+
static long nextBackOff(BackOff backOff, Map<Thread, BackOffExecution> executions) {
143143

144144
Thread currentThread = Thread.currentThread();
145145
BackOffExecution backOffExecution = executions.get(currentThread);

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -133,6 +133,12 @@ public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?>
133133
return skipped.get();
134134
}
135135

136+
/**
137+
* Seek records to begin position, optionally skipping the first.
138+
* @param records the records.
139+
* @param consumer the consumer.
140+
* @param logger a {@link LogAccessor} for seek errors.
141+
*/
136142
public static void doSeeksToBegin(List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
137143
LogAccessor logger) {
138144

0 commit comments

Comments
 (0)