Skip to content

Commit af56eca

Browse files
authored
GH-2195: DefaultErrorHandler Improvements
Resolves #2195 Add an option to avoid seeks after handling exceptions. Instead, pause the consumer for one `poll()` and use the remaining records as the result of that poll. New methods on `CommonErrorHandler` - `handleOne` for record listeners, returning a boolean to indicate whether the record was recovered and should not be redelivered. `handlaBatchAndReturnRemaining` for batch listeners, returning either the complete set or a subset, e.g. when the `DEH` receives a `BatchListenerExecutionFailedException` and commits a partial batch. Also includes the classifier refactoring discussed here #2185 (comment) The new logic is disabled by default, we can consider enabling it in 3.0 and remove the deprecations. * Fix race - do not call `resume()` on the container; the user might have paused after the error. * Change Since to 2.9. * Fix typos. Co-authored-by: Artem Bilan <[email protected]> * Remove unnecessary local variable; add docs. * Polishing - see commit comment for more details - move the resume logic to after the invokes and don't resume if pending records - don't check `isPaused()` after empty poll due to errors; always restore the pending records * Remove unnecessary boolean; fix deprecation warnings and delegating error handlers. * Emergency stop container if the consumer returns records while paused after an error. * Fix race in test - prevent consumer thread from changing pausedConsumers while the test thread is calling revoke/assign. * Remove System.out(). * Add diagnostics to test. * Fix race in test; wait until next poll after consumer thread pauses the partitions. * Fix stubbing in emergency stop test. * Remove unnecessary boolean. **Cherry-pick to `2.9.x`**
1 parent c635572 commit af56eca

22 files changed

+1164
-93
lines changed

Diff for: spring-kafka-docs/src/main/asciidoc/kafka.adoc

+12
Original file line numberDiff line numberDiff line change
@@ -5098,6 +5098,11 @@ If you are using Spring Boot, you simply need to add the error handler as a `@Be
50985098
This new error handler replaces the `SeekToCurrentErrorHandler` and `RecoveringBatchErrorHandler`, which have been the default error handlers for several releases now.
50995099
One difference is that the fallback behavior for batch listeners (when an exception other than a `BatchListenerFailedException` is thrown) is the equivalent of the <<retrying-batch-eh>>.
51005100

5101+
IMPORTANT: Starting with version 2.9, the `DefaultErrorHandler` can be configured to provide the same semantics as seeking the unprocessed record offsets as discussed below, but without actually seeking.
5102+
Instead, the records are retained by the listener container and resubmitted to the listener after the error handler exits (and after performing a single paused `poll()`, to keep the consumer alive).
5103+
The error handler returns a result to the container that indicates whether the current failing record can be resubmitted, or if it was recovered and then it will not be sent to the listener again.
5104+
To enable this mode, set the property `seekAfterError` to `false`.
5105+
51015106
The error handler can recover (skip) a record that keeps failing.
51025107
By default, after ten failures, the failed record is logged (at the `ERROR` level).
51035108
You can configure the handler with a custom recoverer (`BiConsumer`) and a `BackOff` that controls the delivery attempts and delays between each.
@@ -5152,6 +5157,11 @@ The sequence of events is:
51525157
The recovered record's offset is committed
51535158
* If retries are exhausted and recovery fails, seeks are performed as if retries are not exhausted.
51545159

5160+
IMPORTANT: Starting with version 2.9, the `DefaultErrorHandler` can be configured to provide the same semantics as seeking the unprocessed record offsets as discussed above, but without actually seeking.
5161+
Instead, error handler creates a new `ConsumerRecords<?, ?>` containing just the unprocessed records which will then be submitted to the listener (after performing a single paused `poll()`, to keep the consumer alive).
5162+
To enable this mode, set the property `seekAfterError` to `false`.
5163+
5164+
51555165
The default recoverer logs the failed record after retries are exhausted.
51565166
You can use a custom recoverer, or one provided by the framework such as the <<dead-letters,`DeadLetterPublishingRecoverer`>>.
51575167

@@ -5250,6 +5260,8 @@ If the function returns `null`, the handler's default `BackOff` will be used.
52505260
Set `resetStateOnExceptionChange` to `true` and the retry sequence will be restarted (including the selection of a new `BackOff`, if so configured) if the exception type changes between failures.
52515261
By default, the exception type is not considered.
52525262

5263+
Starting with version 2.9, this is now `true` by default.
5264+
52535265
Also see <<delivery-header>>.
52545266

52555267
[[batch-listener-conv-errors]]

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/CommonContainerStoppingErrorHandler.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -71,10 +71,17 @@ public void setStopContainerAbnormally(boolean stopContainerAbnormally) {
7171
}
7272

7373
@Override
74+
@Deprecated
7475
public boolean remainingRecords() {
7576
return true;
7677
}
7778

79+
@Override
80+
public boolean seeksAfterHandling() {
81+
// We don't actually do any seeks here, but stopping the container has the same effect.
82+
return true;
83+
}
84+
7885
@Override
7986
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
8087
MessageListenerContainer container, boolean batchListener) {

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -65,11 +65,17 @@ public void setErrorHandlers(Map<Class<? extends Throwable>, CommonErrorHandler>
6565
}
6666

6767

68+
@SuppressWarnings("deprecation")
6869
@Override
6970
public boolean remainingRecords() {
7071
return this.defaultErrorHandler.remainingRecords();
7172
}
7273

74+
@Override
75+
public boolean seeksAfterHandling() {
76+
return this.defaultErrorHandler.seeksAfterHandling();
77+
}
78+
7379
@Override
7480
public void clearThreadState() {
7581
this.defaultErrorHandler.clearThreadState();
@@ -96,14 +102,18 @@ public void addDelegate(Class<? extends Throwable> throwable, CommonErrorHandler
96102
checkDelegates();
97103
}
98104

105+
@SuppressWarnings("deprecation")
99106
private void checkDelegates() {
100107
boolean remainingRecords = this.defaultErrorHandler.remainingRecords();
101108
boolean ackAfterHandle = this.defaultErrorHandler.isAckAfterHandle();
109+
boolean seeksAfterHandling = this.defaultErrorHandler.seeksAfterHandling();
102110
this.delegates.values().forEach(handler -> {
103111
Assert.isTrue(remainingRecords == handler.remainingRecords(),
104112
"All delegates must return the same value when calling 'remainingRecords()'");
105113
Assert.isTrue(ackAfterHandle == handler.isAckAfterHandle(),
106114
"All delegates must return the same value when calling 'isAckAfterHandle()'");
115+
Assert.isTrue(seeksAfterHandling == handler.seeksAfterHandling(),
116+
"All delegates must return the same value when calling 'seeksAfterHandling()'");
107117
});
108118
}
109119

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java

+63-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,13 +41,25 @@ public interface CommonErrorHandler extends DeliveryAttemptAware {
4141
* When true (default), all remaining records including the failed record are passed
4242
* to the error handler.
4343
* @return false to receive only the failed record.
44+
* @deprecated in favor of {@link #seeksAfterHandling()}.
4445
* @see #handleRecord(Exception, ConsumerRecord, Consumer, MessageListenerContainer)
4546
* @see #handleRemaining(Exception, List, Consumer, MessageListenerContainer)
4647
*/
48+
@Deprecated
4749
default boolean remainingRecords() {
4850
return false;
4951
}
5052

53+
/**
54+
* Return true if this error handler performs seeks on the failed record and remaining
55+
* records (or just the remaining records after a failed record is recovered).
56+
* @return true if the next poll should fetch records.
57+
*/
58+
@SuppressWarnings("deprecation")
59+
default boolean seeksAfterHandling() {
60+
return remainingRecords();
61+
}
62+
5163
/**
5264
* Return true if this error handler supports delivery attempts headers.
5365
* @return true if capable.
@@ -79,14 +91,42 @@ default void handleOtherException(Exception thrownException, Consumer<?, ?> cons
7991
* @param record the record.
8092
* @param consumer the consumer.
8193
* @param container the container.
94+
* @deprecated in favor of
95+
* {@link #handleOne(Exception, ConsumerRecord, Consumer, MessageListenerContainer)}.
8296
* @see #remainingRecords()
8397
*/
98+
@Deprecated
8499
default void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
85100
MessageListenerContainer container) {
86101

87102
LogFactory.getLog(getClass()).error("'handleRecord' is not implemented by this handler", thrownException);
88103
}
89104

105+
/**
106+
* Handle the exception for a record listener when {@link #remainingRecords()} returns
107+
* false. Use this to handle just the single failed record.
108+
* @param thrownException the exception.
109+
* @param record the record.
110+
* @param consumer the consumer.
111+
* @param container the container.
112+
* @return true if the error was "handled" or false if not and the container will
113+
* re-submit the record to the listener.
114+
* @since 2.9
115+
* @see #remainingRecords()
116+
*/
117+
@SuppressWarnings("deprecation")
118+
default boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
119+
MessageListenerContainer container) {
120+
121+
try {
122+
handleRecord(thrownException, record, consumer, container);
123+
return true;
124+
}
125+
catch (Exception ex) {
126+
return false;
127+
}
128+
}
129+
90130
/**
91131
* Handle the exception for a record listener when {@link #remainingRecords()} returns
92132
* true. The failed record and all the remaining records from the poll are passed in.
@@ -120,6 +160,28 @@ default void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data,
120160
LogFactory.getLog(getClass()).error("'handleBatch' is not implemented by this handler", thrownException);
121161
}
122162

163+
/**
164+
* Handle the exception for a batch listener. The complete {@link ConsumerRecords}
165+
* from the poll is supplied. Return the members of the batch that should be re-sent to
166+
* the listener. The returned records MUST be in the same order as the original records.
167+
* @param thrownException the exception.
168+
* @param data the consumer records.
169+
* @param consumer the consumer.
170+
* @param container the container.
171+
* @param invokeListener a callback to re-invoke the listener.
172+
* @param <K> the key type.
173+
* @param <V> the value type.
174+
* @return the consumer records, or a subset.
175+
* @since 2.9
176+
*/
177+
default <K, V> ConsumerRecords<K, V> handleBatchAndReturnRemaining(Exception thrownException,
178+
ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container,
179+
Runnable invokeListener) {
180+
181+
handleBatch(thrownException, data, consumer, container, invokeListener);
182+
return ConsumerRecords.empty();
183+
}
184+
123185
@Override
124186
default int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
125187
return 0;

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/CommonLoggingErrorHandler.java

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public void setAckAfterHandle(boolean ackAfterHandle) {
4848
}
4949

5050
@Override
51+
@Deprecated
5152
public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
5253
MessageListenerContainer container) {
5354

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/CommonMixedErrorHandler.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,11 +51,17 @@ public CommonMixedErrorHandler(CommonErrorHandler recordErrorHandler, CommonErro
5151
this.batchErrorHandler = batchErrorHandler;
5252
}
5353

54+
@SuppressWarnings("deprecation")
5455
@Override
5556
public boolean remainingRecords() {
5657
return this.recordErrorHandler.remainingRecords();
5758
}
5859

60+
@Override
61+
public boolean seeksAfterHandling() {
62+
return this.recordErrorHandler.seeksAfterHandling();
63+
}
64+
5965
@Override
6066
public boolean deliveryAttemptHeader() {
6167
return this.recordErrorHandler.deliveryAttemptHeader();
@@ -73,10 +79,10 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu
7379
}
7480

7581
@Override
76-
public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
82+
public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
7783
MessageListenerContainer container) {
7884

79-
this.recordErrorHandler.handleRecord(thrownException, record, consumer, container);
85+
return this.recordErrorHandler.handleOne(thrownException, record, consumer, container);
8086
}
8187

8288
@Override

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,8 @@ private void checkConfig() {
133133
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
134134
@Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {
135135

136-
if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
137-
getRecoveryStrategy((List) records, exception), container, this.logger)
136+
if (SeekUtils.doSeeks((List) records, consumer, exception, recoverable,
137+
getFailureTracker()::recovered, container, this.logger)
138138
&& isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
139139
ConsumerRecord<K, V> skipped = records.get(0);
140140
this.kafkaTemplate.sendOffsetsToTransaction(

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java

+28-2
Original file line numberDiff line numberDiff line change
@@ -117,21 +117,39 @@ public void setAckAfterHandle(boolean ackAfterHandle) {
117117
}
118118

119119
@Override
120+
@Deprecated
120121
public boolean remainingRecords() {
121-
return true;
122+
return isSeekAfterError();
123+
}
124+
125+
@Override
126+
public boolean seeksAfterHandling() {
127+
return isSeekAfterError();
122128
}
123129

124130
@Override
125131
public boolean deliveryAttemptHeader() {
126132
return true;
127133
}
128134

135+
@Override
136+
public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
137+
MessageListenerContainer container) {
138+
139+
try {
140+
return getFailureTracker().recovered(record, thrownException, container, consumer);
141+
}
142+
catch (Exception ex) {
143+
return false;
144+
}
145+
}
146+
129147
@Override
130148
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,
131149
Consumer<?, ?> consumer, MessageListenerContainer container) {
132150

133151
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
134-
getRecoveryStrategy(records, consumer, thrownException), this.logger, getLogLevel());
152+
getFailureTracker()::recovered, this.logger, getLogLevel());
135153
}
136154

137155
@Override
@@ -141,6 +159,14 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
141159
doHandle(thrownException, data, consumer, container, invokeListener);
142160
}
143161

162+
@Override
163+
public <K, V> ConsumerRecords<K, V> handleBatchAndReturnRemaining(Exception thrownException,
164+
ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container,
165+
Runnable invokeListener) {
166+
167+
return handle(thrownException, data, consumer, container, invokeListener);
168+
}
169+
144170
@Override
145171
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
146172
MessageListenerContainer container, boolean batchListener) {

0 commit comments

Comments
 (0)