Skip to content

Commit 17ea294

Browse files
committed
spring-projectsGH-615: Add DefaultErrorHandler
See spring-projects#615 Replaces legacy `SeekToCurrentErrorHandler` and `RecoveringBatchErrorHandler`. These were the previous defaults when no transaction manager is present. They will be deprecated in a future PR. - refactor common code into superclass/utilities - copy existing test case classes, changing the error handler types
1 parent 03dc8c1 commit 17ea294

10 files changed

+865
-97
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,18 @@
3636
* the partitions for the remaining records will be repositioned and/or the failed record
3737
* can be recovered and skipped. If some other exception is thrown, or a valid record is
3838
* not provided in the exception, error handling is delegated to a
39-
* {@link SeekToCurrentBatchErrorHandler} with this handler's {@link BackOff}. If the
40-
* record is recovered, its offset is committed.
39+
* {@link RetryingBatchErrorHandler} with this handler's {@link BackOff}. If the record is
40+
* recovered, its offset is committed. This is a replacement for the legacy
41+
* {@link SeekToCurrentErrorHandler} and {@link SeekToCurrentBatchErrorHandler} (but the
42+
* fallback now can send the messages to a recoverer after retries are completed instead
43+
* of retring indefinitely).
4144
*
4245
* @author Gary Russell
4346
*
4447
* @since 2.8
4548
*
4649
*/
47-
public class DefaultErrorHandler extends FailedRecordProcessor implements CommonErrorHandler {
50+
public class DefaultErrorHandler extends FailedBatchProcessor implements CommonErrorHandler {
4851

4952
private boolean ackAfterHandle = true;
5053

@@ -83,7 +86,11 @@ public DefaultErrorHandler(ConsumerRecordRecoverer recoverer) {
8386
* @param backOff the {@link BackOff}.
8487
*/
8588
public DefaultErrorHandler(@Nullable ConsumerRecordRecoverer recoverer, BackOff backOff) {
86-
super(recoverer, backOff);
89+
super(recoverer, backOff, createFallback(backOff, recoverer));
90+
}
91+
92+
private static CommonErrorHandler createFallback(BackOff backOff, ConsumerRecordRecoverer recoverer) {
93+
return new ErrorHandlerAdapter(new RetryingBatchErrorHandler(backOff, recoverer));
8794
}
8895

8996
/**
@@ -121,7 +128,7 @@ public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>
121128
public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
122129
MessageListenerContainer container, Runnable invokeListener) {
123130

124-
CommonErrorHandler.super.handleBatch(thrownException, data, consumer, container, invokeListener);
131+
doHandle(thrownException, data, consumer, container, invokeListener);
125132
}
126133

127134
@Override
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.time.Duration;
20+
import java.util.function.BiConsumer;
21+
22+
import org.apache.kafka.clients.consumer.Consumer;
23+
import org.apache.kafka.clients.consumer.ConsumerRecords;
24+
25+
import org.springframework.core.log.LogAccessor;
26+
import org.springframework.kafka.KafkaException;
27+
import org.springframework.util.backoff.BackOff;
28+
import org.springframework.util.backoff.BackOffExecution;
29+
30+
/**
31+
* Utilities for error handling.
32+
*
33+
* @author Gary Russell
34+
* @since 2.8
35+
*
36+
*/
37+
public final class ErrorHandlingUtils {
38+
39+
private ErrorHandlingUtils() {
40+
}
41+
42+
/**
43+
* Retry a complete batch by pausing the consumer and then, in a loop, poll the
44+
* consumer, wait for the next back off, then call the listener. When retries are
45+
* exhausted, call the recoverer with the {@link ConsumerRecords}.
46+
* @param thrownException the exception.
47+
* @param records the records.
48+
* @param consumer the consumer.
49+
* @param container the container.
50+
* @param invokeListener the {@link Runnable} to run (call the listener).
51+
* @param backOff the backOff.
52+
* @param seeker the common error handler that re-seeks the entire batch.
53+
* @param recoverer the recoverer.
54+
* @param logger the logger.
55+
* @param logLevel the log level.
56+
*/
57+
public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
58+
MessageListenerContainer container, Runnable invokeListener, BackOff backOff,
59+
CommonErrorHandler seeker, BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer, LogAccessor logger,
60+
KafkaException.Level logLevel) {
61+
62+
BackOffExecution execution = backOff.start();
63+
long nextBackOff = execution.nextBackOff();
64+
String failed = null;
65+
consumer.pause(consumer.assignment());
66+
try {
67+
while (nextBackOff != BackOffExecution.STOP) {
68+
consumer.poll(Duration.ZERO);
69+
try {
70+
ListenerUtils.stoppableSleep(container, nextBackOff);
71+
}
72+
catch (InterruptedException e1) {
73+
Thread.currentThread().interrupt();
74+
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
75+
throw new KafkaException("Interrupted during retry", logLevel, e1);
76+
}
77+
try {
78+
invokeListener.run();
79+
return;
80+
}
81+
catch (Exception e) {
82+
if (failed == null) {
83+
failed = recordsToString(records);
84+
}
85+
String toLog = failed;
86+
logger.debug(e, () -> "Retry failed for: " + toLog);
87+
}
88+
nextBackOff = execution.nextBackOff();
89+
}
90+
try {
91+
recoverer.accept(records, thrownException);
92+
}
93+
catch (Exception e) {
94+
logger.error(e, () -> "Recoverer threw an exception; re-seeking batch");
95+
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
96+
}
97+
}
98+
finally {
99+
consumer.resume(consumer.assignment());
100+
}
101+
}
102+
103+
/**
104+
* Represent the records as a comma-delimited String of {@code topic-part@offset}.
105+
* @param records the records.
106+
* @return the String.
107+
*/
108+
public static String recordsToString(ConsumerRecords<?, ?> records) {
109+
StringBuffer sb = new StringBuffer();
110+
records.spliterator().forEachRemaining(rec -> sb
111+
.append(ListenerUtils.recordToString(rec, true))
112+
.append(','));
113+
sb.deleteCharAt(sb.length() - 1);
114+
return sb.toString();
115+
}
116+
117+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,15 @@
3939
import org.springframework.util.backoff.BackOff;
4040

4141
/**
42+
* Subclass of {@link FailedRecordProcessor} that can process (and recover) a batch. If
43+
* the listener throws a {@link BatchListenerFailedException}, the offsets prior to the
44+
* failed record are committed and the remaining records have seeks performed. When the
45+
* retries are exhausted, the failed record is sent to the recoverer instead of being
46+
* included in the seeks. If other exceptions are thrown processing is delegated to the
47+
* fallback handler.
48+
*
4249
* @author Gary Russell
43-
* @since 2.7
50+
* @since 2.8
4451
*
4552
*/
4653
public abstract class FailedBatchProcessor extends FailedRecordProcessor {
@@ -53,9 +60,11 @@ public abstract class FailedBatchProcessor extends FailedRecordProcessor {
5360
* Construct an instance with the provided properties.
5461
* @param recoverer the recoverer.
5562
* @param backOff the back off.
63+
* @param fallbackHandler the fall back handler.
5664
*/
57-
public FailedBatchProcessor(@Nullable
58-
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff, CommonErrorHandler fallbackHandler) {
65+
public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
66+
CommonErrorHandler fallbackHandler) {
67+
5968
super(recoverer, backOff);
6069
this.fallbackHandler = fallbackHandler;
6170
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -797,14 +797,17 @@ else if (listener instanceof MessageListener) {
797797
}
798798

799799
@Nullable
800-
private CommonErrorHandler determineCommonErrorHandler(GenericErrorHandler<?> errHandler) {
800+
private CommonErrorHandler determineCommonErrorHandler(@Nullable GenericErrorHandler<?> errHandler) {
801801
CommonErrorHandler common = getCommonErrorHandler();
802802
if (common != null) {
803803
if (errHandler != null) {
804804
this.logger.debug("GenericErrorHandler is ignored when a CommonErrorHandler is provided");
805805
}
806806
return common;
807807
}
808+
if (errHandler == null && this.transactionManager == null) {
809+
return new DefaultErrorHandler();
810+
}
808811
if (this.isBatchListener) {
809812
validateErrorHandler(true);
810813
BatchErrorHandler batchErrorHandler = determineBatchErrorHandler(errHandler);

spring-kafka/src/main/java/org/springframework/kafka/listener/RecoveringBatchErrorHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void setAckAfterHandle(boolean ackAfterHandle) {
106106
public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
107107
MessageListenerContainer container) {
108108

109-
doHandle(thrownException, data, consumer, container);
109+
doHandle(thrownException, data, consumer, container, () -> { });
110110
}
111111

112112
}

spring-kafka/src/main/java/org/springframework/kafka/listener/RecoveryUtils.java

Lines changed: 0 additions & 30 deletions
This file was deleted.

spring-kafka/src/main/java/org/springframework/kafka/listener/RetryingBatchErrorHandler.java

Lines changed: 4 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,15 @@
1616

1717
package org.springframework.kafka.listener;
1818

19-
import java.time.Duration;
2019
import java.util.function.BiConsumer;
2120

2221
import org.apache.commons.logging.LogFactory;
2322
import org.apache.kafka.clients.consumer.Consumer;
2423
import org.apache.kafka.clients.consumer.ConsumerRecords;
2524

2625
import org.springframework.core.log.LogAccessor;
27-
import org.springframework.kafka.KafkaException;
2826
import org.springframework.lang.Nullable;
2927
import org.springframework.util.backoff.BackOff;
30-
import org.springframework.util.backoff.BackOffExecution;
3128
import org.springframework.util.backoff.FixedBackOff;
3229

3330
/**
@@ -51,7 +48,7 @@ public class RetryingBatchErrorHandler extends KafkaExceptionLogLevelAware
5148

5249
private final BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer;
5350

54-
private final SeekToCurrentBatchErrorHandler seeker = new SeekToCurrentBatchErrorHandler();
51+
private final CommonErrorHandler seeker = new ErrorHandlerAdapter(new SeekToCurrentBatchErrorHandler());
5552

5653
private boolean ackAfterHandle = true;
5754

@@ -74,7 +71,7 @@ public RetryingBatchErrorHandler(BackOff backOff, @Nullable ConsumerRecordRecove
7471
this.backOff = backOff;
7572
this.recoverer = (crs, ex) -> {
7673
if (recoverer == null) {
77-
LOGGER.error(ex, () -> "Records discarded: " + tpos(crs));
74+
LOGGER.error(ex, () -> "Records discarded: " + ErrorHandlingUtils.recordsToString(crs));
7875
}
7976
else {
8077
crs.spliterator().forEachRemaining(rec -> recoverer.accept(rec, ex));
@@ -100,58 +97,8 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> re
10097
LOGGER.error(thrownException, "Called with no records; consumer exception");
10198
return;
10299
}
103-
BackOffExecution execution = this.backOff.start();
104-
long nextBackOff = execution.nextBackOff();
105-
String failed = null;
106-
consumer.pause(consumer.assignment());
107-
try {
108-
while (nextBackOff != BackOffExecution.STOP) {
109-
consumer.poll(Duration.ZERO);
110-
try {
111-
ListenerUtils.stoppableSleep(container, nextBackOff);
112-
}
113-
catch (InterruptedException e1) {
114-
Thread.currentThread().interrupt();
115-
this.seeker.handle(thrownException, records, consumer, container);
116-
throw new KafkaException("Interrupted during retry", getLogLevel(), e1);
117-
}
118-
try {
119-
invokeListener.run();
120-
return;
121-
}
122-
catch (Exception e) {
123-
if (failed == null) {
124-
failed = tpos(records);
125-
}
126-
String toLog = failed;
127-
LOGGER.debug(e, () -> "Retry failed for: " + toLog);
128-
}
129-
nextBackOff = execution.nextBackOff();
130-
}
131-
try {
132-
this.recoverer.accept(records, thrownException);
133-
}
134-
catch (Exception e) {
135-
LOGGER.error(e, () -> "Recoverer threw an exception; re-seeking batch");
136-
this.seeker.handle(thrownException, records, consumer, container);
137-
}
138-
}
139-
finally {
140-
consumer.resume(consumer.assignment());
141-
}
142-
}
143-
144-
private String tpos(ConsumerRecords<?, ?> records) {
145-
StringBuffer sb = new StringBuffer();
146-
records.spliterator().forEachRemaining(rec -> sb
147-
.append(rec.topic())
148-
.append('-')
149-
.append(rec.partition())
150-
.append('@')
151-
.append(rec.offset())
152-
.append(','));
153-
sb.deleteCharAt(sb.length() - 1);
154-
return sb.toString();
100+
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
101+
this.seeker, this.recoverer, LOGGER, getLogLevel());
155102
}
156103

157104
}

0 commit comments

Comments
 (0)