Skip to content

Commit 502e08b

Browse files
committed
spring-projectsGH-615: Add DefaultErrorHandler
See spring-projects#615 Replaces legacy `SeekToCurrentErrorHandler` and `RecoveringBatchErrorHandler`. These were the previous defaults when no transaction manager is present. They will be deprecated in a future PR. - refactor common code into superclass/utilities - copy existing test case classes, changing the error handler types
1 parent 4ecc787 commit 502e08b

9 files changed

+1187
-190
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.List;
20+
21+
import org.apache.kafka.clients.consumer.Consumer;
22+
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
import org.apache.kafka.clients.consumer.ConsumerRecords;
24+
import org.apache.kafka.common.errors.SerializationException;
25+
26+
import org.springframework.lang.Nullable;
27+
import org.springframework.util.backoff.BackOff;
28+
29+
/**
30+
* An error handler that, for record listeners, seeks to the current offset for each topic
31+
* in the remaining records. Used to rewind partitions after a message failure so that it
32+
* can be replayed. For batch listeners, seeks to the current offset for each topic in a
33+
* batch of records. Used to rewind partitions after a message failure so that the batch
34+
* can be replayed. If the listener throws a {@link BatchListenerFailedException}, with
35+
* the failed record. The records before the record will have their offsets committed and
36+
* the partitions for the remaining records will be repositioned and/or the failed record
37+
* can be recovered and skipped. If some other exception is thrown, or a valid record is
38+
* not provided in the exception, error handling is delegated to a
39+
* {@link RetryingBatchErrorHandler} with this handler's {@link BackOff}. If the record is
40+
* recovered, its offset is committed. This is a replacement for the legacy
41+
* {@link SeekToCurrentErrorHandler} and {@link SeekToCurrentBatchErrorHandler} (but the
42+
* fallback now can send the messages to a recoverer after retries are completed instead
43+
* of retring indefinitely).
44+
*
45+
* @author Gary Russell
46+
*
47+
* @since 2.8
48+
*
49+
*/
50+
public class DefaultErrorHandler extends FailedBatchProcessor implements CommonErrorHandler {
51+
52+
private boolean ackAfterHandle = true;
53+
54+
/**
55+
* Construct an instance with the default recoverer which simply logs the record after
56+
* {@value SeekUtils#DEFAULT_MAX_FAILURES} (maxFailures) have occurred for a
57+
* topic/partition/offset, with the default back off (9 retries, no delay).
58+
*/
59+
public DefaultErrorHandler() {
60+
this(null, SeekUtils.DEFAULT_BACK_OFF);
61+
}
62+
63+
/**
64+
* Construct an instance with the default recoverer which simply logs the record after
65+
* the backOff returns STOP for a topic/partition/offset.
66+
* @param backOff the {@link BackOff}.
67+
*/
68+
public DefaultErrorHandler(BackOff backOff) {
69+
this(null, backOff);
70+
}
71+
72+
/**
73+
* Construct an instance with the provided recoverer which will be called after
74+
* {@value SeekUtils#DEFAULT_MAX_FAILURES} (maxFailures) have occurred for a
75+
* topic/partition/offset.
76+
* @param recoverer the recoverer.
77+
*/
78+
public DefaultErrorHandler(ConsumerRecordRecoverer recoverer) {
79+
this(recoverer, SeekUtils.DEFAULT_BACK_OFF);
80+
}
81+
82+
/**
83+
* Construct an instance with the provided recoverer which will be called after
84+
* the backOff returns STOP for a topic/partition/offset.
85+
* @param recoverer the recoverer; if null, the default (logging) recoverer is used.
86+
* @param backOff the {@link BackOff}.
87+
*/
88+
public DefaultErrorHandler(@Nullable ConsumerRecordRecoverer recoverer, BackOff backOff) {
89+
super(recoverer, backOff, createFallback(backOff, recoverer));
90+
}
91+
92+
private static CommonErrorHandler createFallback(BackOff backOff, ConsumerRecordRecoverer recoverer) {
93+
return new ErrorHandlerAdapter(new RetryingBatchErrorHandler(backOff, recoverer));
94+
}
95+
96+
/**
97+
* {@inheritDoc}
98+
* The container must be configured with
99+
* {@link org.springframework.kafka.listener.ContainerProperties.AckMode#MANUAL_IMMEDIATE}.
100+
* Whether or not the commit is sync or async depends on the container's syncCommits
101+
* property.
102+
* @param commitRecovered true to commit.
103+
*/
104+
@Override
105+
public void setCommitRecovered(boolean commitRecovered) { // NOSONAR enhanced javadoc
106+
super.setCommitRecovered(commitRecovered);
107+
}
108+
109+
@Override
110+
public boolean isAckAfterHandle() {
111+
return this.ackAfterHandle;
112+
}
113+
114+
@Override
115+
public void setAckAfterHandle(boolean ackAfterHandle) {
116+
this.ackAfterHandle = ackAfterHandle;
117+
}
118+
119+
@Override
120+
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,
121+
Consumer<?, ?> consumer, MessageListenerContainer container) {
122+
123+
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
124+
getRecoveryStrategy(records, thrownException), this.logger, getLogLevel());
125+
}
126+
127+
@Override
128+
public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
129+
MessageListenerContainer container, Runnable invokeListener) {
130+
131+
doHandle(thrownException, data, consumer, container, invokeListener);
132+
}
133+
134+
@Override
135+
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
136+
MessageListenerContainer container) {
137+
138+
if (thrownException instanceof SerializationException) {
139+
throw new IllegalStateException("This error handler cannot process 'SerializationException's directly; "
140+
+ "please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key "
141+
+ "deserializer", thrownException);
142+
}
143+
else {
144+
throw new IllegalStateException("This error handler cannot process '"
145+
+ thrownException.getClass().getName()
146+
+ "'s; no record information is available", thrownException);
147+
}
148+
}
149+
150+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.time.Duration;
20+
import java.util.function.BiConsumer;
21+
22+
import org.apache.kafka.clients.consumer.Consumer;
23+
import org.apache.kafka.clients.consumer.ConsumerRecords;
24+
25+
import org.springframework.core.log.LogAccessor;
26+
import org.springframework.kafka.KafkaException;
27+
import org.springframework.util.backoff.BackOff;
28+
import org.springframework.util.backoff.BackOffExecution;
29+
30+
/**
31+
* Utilities for error handling.
32+
*
33+
* @author Gary Russell
34+
* @since 2.8
35+
*
36+
*/
37+
public final class ErrorHandlingUtils {
38+
39+
private ErrorHandlingUtils() {
40+
}
41+
42+
/**
43+
* Retry a complete batch by pausing the consumer and then, in a loop, poll the
44+
* consumer, wait for the next back off, then call the listener. When retries are
45+
* exhausted, call the recoverer with the {@link ConsumerRecords}.
46+
* @param thrownException the exception.
47+
* @param records the records.
48+
* @param consumer the consumer.
49+
* @param container the container.
50+
* @param invokeListener the {@link Runnable} to run (call the listener).
51+
* @param backOff the backOff.
52+
* @param seeker the common error handler that re-seeks the entire batch.
53+
* @param recoverer the recoverer.
54+
* @param logger the logger.
55+
* @param logLevel the log level.
56+
*/
57+
public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
58+
MessageListenerContainer container, Runnable invokeListener, BackOff backOff,
59+
CommonErrorHandler seeker, BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer, LogAccessor logger,
60+
KafkaException.Level logLevel) {
61+
62+
BackOffExecution execution = backOff.start();
63+
long nextBackOff = execution.nextBackOff();
64+
String failed = null;
65+
consumer.pause(consumer.assignment());
66+
try {
67+
while (nextBackOff != BackOffExecution.STOP) {
68+
consumer.poll(Duration.ZERO);
69+
try {
70+
ListenerUtils.stoppableSleep(container, nextBackOff);
71+
}
72+
catch (InterruptedException e1) {
73+
Thread.currentThread().interrupt();
74+
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
75+
throw new KafkaException("Interrupted during retry", logLevel, e1);
76+
}
77+
try {
78+
invokeListener.run();
79+
return;
80+
}
81+
catch (Exception e) {
82+
if (failed == null) {
83+
failed = recordsToString(records);
84+
}
85+
String toLog = failed;
86+
logger.debug(e, () -> "Retry failed for: " + toLog);
87+
}
88+
nextBackOff = execution.nextBackOff();
89+
}
90+
try {
91+
recoverer.accept(records, thrownException);
92+
}
93+
catch (Exception e) {
94+
logger.error(e, () -> "Recoverer threw an exception; re-seeking batch");
95+
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
96+
}
97+
}
98+
finally {
99+
consumer.resume(consumer.assignment());
100+
}
101+
}
102+
103+
/**
104+
* Represent the records as a comma-delimited String of {@code topic-part@offset}.
105+
* @param records the records.
106+
* @return the String.
107+
*/
108+
public static String recordsToString(ConsumerRecords<?, ?> records) {
109+
StringBuffer sb = new StringBuffer();
110+
records.spliterator().forEachRemaining(rec -> sb
111+
.append(ListenerUtils.recordToString(rec, true))
112+
.append(','));
113+
sb.deleteCharAt(sb.length() - 1);
114+
return sb.toString();
115+
}
116+
117+
}

0 commit comments

Comments
 (0)