Skip to content

Commit 8b9272a

Browse files
committed
spring-projectsGH-2195: DefaultErrorHandlerImprovements
Resolves spring-projects#2195 Add an option to avoid seeks after handling exceptions. Instead, pause the consumer for one `poll()` and use the remaining records as the result of that poll. New methods on `CommonErrorHandler` - `handleOne` for record listeners, returning a boolean to indicate whether the record was recovered and should not be redelivered. `handlaBatchAndReturnRemaining` for batch listeners, returning either the complete set or a subset, e.g. when the `DEH` receives a `BatchListenerExecutionFailedException` and commits a partial batch. Also includes the classifier refactoring discussed here spring-projects#2185 (comment) The new logic is disabled by default, we can consider enabling it in 3.0 and remove the deprecations.
1 parent 10905dc commit 8b9272a

18 files changed

+1058
-81
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,13 +41,25 @@ public interface CommonErrorHandler extends DeliveryAttemptAware {
4141
* When true (default), all remaining records including the failed record are passed
4242
* to the error handler.
4343
* @return false to receive only the failed record.
44+
* @deprecated in favor of {@link #seeksAfterHandling()}.
4445
* @see #handleRecord(Exception, ConsumerRecord, Consumer, MessageListenerContainer)
4546
* @see #handleRemaining(Exception, List, Consumer, MessageListenerContainer)
4647
*/
48+
@Deprecated
4749
default boolean remainingRecords() {
4850
return false;
4951
}
5052

53+
/**
54+
* Return true if this error handle performs seeks on the failed record and remaining
55+
* records (or just the remaining records after a failed record is recovered).
56+
* @return true if the next poll should fetch records.
57+
*/
58+
@SuppressWarnings("deprecation")
59+
default boolean seeksAfterHandling() {
60+
return remainingRecords();
61+
}
62+
5163
/**
5264
* Return true if this error handler supports delivery attempts headers.
5365
* @return true if capable.
@@ -79,14 +91,41 @@ default void handleOtherException(Exception thrownException, Consumer<?, ?> cons
7991
* @param record the record.
8092
* @param consumer the consumer.
8193
* @param container the container.
94+
* @deprecated in favor of
95+
* {@link #handleOne(Exception, ConsumerRecord, Consumer, MessageListenerContainer)}.
8296
* @see #remainingRecords()
8397
*/
98+
@Deprecated
8499
default void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
85100
MessageListenerContainer container) {
86101

87102
LogFactory.getLog(getClass()).error("'handleRecord' is not implemented by this handler", thrownException);
88103
}
89104

105+
/**
106+
* Handle the exception for a record listener when {@link #remainingRecords()} returns
107+
* false. Use this to handle just the single failed record.
108+
* @param thrownException the exception.
109+
* @param record the record.
110+
* @param consumer the consumer.
111+
* @param container the container.
112+
* @return true if the error was "handled" or false if not and the container will
113+
* re-submit the record to the listener.
114+
* @see #remainingRecords()
115+
*/
116+
@SuppressWarnings("deprecation")
117+
default boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
118+
MessageListenerContainer container) {
119+
120+
try {
121+
handleRecord(thrownException, record, consumer, container);
122+
return true;
123+
}
124+
catch (Exception ex) {
125+
return false;
126+
}
127+
}
128+
90129
/**
91130
* Handle the exception for a record listener when {@link #remainingRecords()} returns
92131
* true. The failed record and all the remaining records from the poll are passed in.
@@ -120,6 +159,28 @@ default void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data,
120159
LogFactory.getLog(getClass()).error("'handleBatch' is not implemented by this handler", thrownException);
121160
}
122161

162+
/**
163+
* Handle the exception for a batch listener. The complete {@link ConsumerRecords}
164+
* from the poll is supplied. Return the members of the batch that should be re-sent to
165+
* the listener. The returned records MUST be in the same order as the original records.
166+
* @param thrownException the exception.
167+
* @param data the consumer records.
168+
* @param consumer the consumer.
169+
* @param container the container.
170+
* @param invokeListener a callback to re-invoke the listener.
171+
* @param <K> the key type.
172+
* @param <V> the value type.
173+
* @return the consumer records, or a subset.
174+
* @since 2.8.5
175+
*/
176+
default <K, V> ConsumerRecords<K, V> handleBatchAndReturnRemaining(Exception thrownException,
177+
ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container,
178+
Runnable invokeListener) {
179+
180+
handleBatch(thrownException, data, consumer, container, invokeListener);
181+
return ConsumerRecords.empty();
182+
}
183+
123184
@Override
124185
default int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
125186
return 0;

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonMixedErrorHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -73,10 +73,10 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu
7373
}
7474

7575
@Override
76-
public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
76+
public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
7777
MessageListenerContainer container) {
7878

79-
this.recordErrorHandler.handleRecord(thrownException, record, consumer, container);
79+
return this.recordErrorHandler.handleOne(thrownException, record, consumer, container);
8080
}
8181

8282
@Override

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,9 @@ private void checkConfig() {
133133
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
134134
@Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {
135135

136-
if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
137-
getRecoveryStrategy((List) records, exception), container, this.logger)
136+
List records2 = records;
137+
if (SeekUtils.doSeeks(records2, consumer, exception, recoverable,
138+
getFailureTracker()::recovered, container, this.logger)
138139
&& isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
139140
ConsumerRecord<K, V> skipped = records.get(0);
140141
this.kafkaTemplate.sendOffsetsToTransaction(

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,20 +118,37 @@ public void setAckAfterHandle(boolean ackAfterHandle) {
118118

119119
@Override
120120
public boolean remainingRecords() {
121-
return true;
121+
return isSeekAfterError();
122+
}
123+
124+
@Override
125+
public boolean seeksAfterHandling() {
126+
return remainingRecords();
122127
}
123128

124129
@Override
125130
public boolean deliveryAttemptHeader() {
126131
return true;
127132
}
128133

134+
@Override
135+
public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
136+
MessageListenerContainer container) {
137+
138+
try {
139+
return getFailureTracker().recovered(record, thrownException, container, consumer);
140+
}
141+
catch (Exception ex) {
142+
return false;
143+
}
144+
}
145+
129146
@Override
130147
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,
131148
Consumer<?, ?> consumer, MessageListenerContainer container) {
132149

133150
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
134-
getRecoveryStrategy(records, consumer, thrownException), this.logger, getLogLevel());
151+
getFailureTracker()::recovered, this.logger, getLogLevel());
135152
}
136153

137154
@Override
@@ -141,6 +158,14 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
141158
doHandle(thrownException, data, consumer, container, invokeListener);
142159
}
143160

161+
@Override
162+
public <K, V> ConsumerRecords<K, V> handleBatchAndReturnRemaining(Exception thrownException,
163+
ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container,
164+
Runnable invokeListener) {
165+
166+
return handle(thrownException, data, consumer, container, invokeListener);
167+
}
168+
144169
@Override
145170
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
146171
MessageListenerContainer container, boolean batchListener) {

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
7272
protected void doHandle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
7373
MessageListenerContainer container, Runnable invokeListener) {
7474

75+
handle(thrownException, data, consumer, container, invokeListener);
76+
}
77+
78+
protected <K, V> ConsumerRecords handle(Exception thrownException, ConsumerRecords<?, ?> data,
79+
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
80+
7581
BatchListenerFailedException batchListenerFailedException = getBatchListenerFailedException(thrownException);
7682
if (batchListenerFailedException == null) {
7783
this.logger.debug(thrownException, "Expected a BatchListenerFailedException; re-seeking batch");
@@ -87,9 +93,10 @@ protected void doHandle(Exception thrownException, ConsumerRecords<?, ?> data, C
8793
this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
8894
}
8995
else {
90-
seekOrRecover(thrownException, data, consumer, container, index);
96+
return seekOrRecover(thrownException, data, consumer, container, index);
9197
}
9298
}
99+
return ConsumerRecords.empty();
93100
}
94101

95102
private int findIndex(ConsumerRecords<?, ?> data, ConsumerRecord<?, ?> record) {
@@ -109,10 +116,12 @@ private int findIndex(ConsumerRecords<?, ?> data, ConsumerRecord<?, ?> record) {
109116
return i;
110117
}
111118

112-
private void seekOrRecover(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container, int indexArg) {
119+
@SuppressWarnings("unchecked")
120+
private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @Nullable ConsumerRecords<?, ?> data,
121+
Consumer<?, ?> consumer, MessageListenerContainer container, int indexArg) {
113122

114123
if (data == null) {
115-
return;
124+
return ConsumerRecords.empty();
116125
}
117126
Iterator<?> iterator = data.iterator();
118127
List<ConsumerRecord<?, ?>> toCommit = new ArrayList<>();
@@ -133,15 +142,37 @@ private void seekOrRecover(Exception thrownException, @Nullable ConsumerRecords<
133142
if (offsets.size() > 0) {
134143
commit(consumer, container, offsets);
135144
}
136-
if (remaining.size() > 0) {
137-
SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false,
138-
getRecoveryStrategy(remaining, thrownException), this.logger, getLogLevel());
139-
ConsumerRecord<?, ?> recovered = remaining.get(0);
140-
commit(consumer, container,
141-
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
142-
new OffsetAndMetadata(recovered.offset() + 1)));
143-
if (remaining.size() > 1) {
144-
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
145+
if (isSeekAfterError()) {
146+
if (remaining.size() > 0) {
147+
SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false,
148+
getFailureTracker()::recovered, this.logger, getLogLevel());
149+
ConsumerRecord<?, ?> recovered = remaining.get(0);
150+
commit(consumer, container,
151+
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
152+
new OffsetAndMetadata(recovered.offset() + 1)));
153+
if (remaining.size() > 1) {
154+
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
155+
}
156+
}
157+
return ConsumerRecords.empty();
158+
}
159+
else {
160+
if (indexArg == 0) {
161+
return (ConsumerRecords<K, V>) data; // first record just rerun the whole thing
162+
}
163+
else {
164+
try {
165+
if (getFailureTracker().recovered(remaining.get(0), thrownException, container,
166+
consumer)) {
167+
remaining.remove(0);
168+
}
169+
}
170+
catch (Exception e) {
171+
}
172+
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
173+
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
174+
tp -> new ArrayList<ConsumerRecord<K, V>>()).add((ConsumerRecord<K, V>) rec));
175+
return new ConsumerRecords<>(remains);
145176
}
146177
}
147178
}

0 commit comments

Comments
 (0)