Skip to content

Commit 3b06529

Browse files
authored
GH-615: Add DefaultErrorHandler
* GH-615: Add DefaultErrorHandler See #615 Replaces legacy `SeekToCurrentErrorHandler` and `RecoveringBatchErrorHandler`. These were the previous defaults when no transaction manager is present. They will be deprecated in a future PR. - refactor common code into superclass/utilities - copy existing test case classes, changing the error handler types * Add CommonDelegatingErrorHandler
1 parent 24ff85d commit 3b06529

14 files changed

+1455
-190
lines changed
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.LinkedHashMap;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.Map.Entry;
23+
24+
import org.apache.kafka.clients.consumer.Consumer;
25+
import org.apache.kafka.clients.consumer.ConsumerRecord;
26+
import org.apache.kafka.clients.consumer.ConsumerRecords;
27+
28+
import org.springframework.lang.Nullable;
29+
import org.springframework.util.Assert;
30+
31+
/**
32+
* An error handler that delegates to different error handlers, depending on the exception
33+
* type. The delegates must have compatible properties ({@link #isAckAfterHandle()} etc.
34+
* {@link #deliveryAttemptHeader()} is not supported - always returns false.
35+
*
36+
* @author Gary Russell
37+
* @since 2.8
38+
*
39+
*/
40+
public class CommonDelegatingErrorHandler implements CommonErrorHandler {
41+
42+
private final CommonErrorHandler defaultErrorHandler;
43+
44+
private final Map<Class<? extends Throwable>, CommonErrorHandler> delegates = new LinkedHashMap<>();
45+
46+
/**
47+
* Construct an instance with a default error handler that will be invoked if the
48+
* exception has no matches.
49+
* @param defaultErrorHandler the default error handler.
50+
*/
51+
public CommonDelegatingErrorHandler(CommonErrorHandler defaultErrorHandler) {
52+
Assert.notNull(defaultErrorHandler, "'defaultErrorHandler' cannot be null");
53+
this.defaultErrorHandler = defaultErrorHandler;
54+
}
55+
56+
/**
57+
* Set the delegate error handlers; a {@link LinkedHashMap} argument is recommended so
58+
* that the delegates are searched in a known order.
59+
* @param delegates the delegates.
60+
*/
61+
public void setErrorHandlers(Map<Class<? extends Throwable>, CommonErrorHandler> delegates) {
62+
this.delegates.clear();
63+
this.delegates.putAll(delegates);
64+
checkDelegates();
65+
}
66+
67+
68+
@Override
69+
public boolean remainingRecords() {
70+
return this.defaultErrorHandler.remainingRecords();
71+
}
72+
73+
@Override
74+
public void clearThreadState() {
75+
this.defaultErrorHandler.clearThreadState();
76+
this.delegates.values().forEach(handler -> handler.clearThreadState());
77+
}
78+
79+
@Override
80+
public boolean isAckAfterHandle() {
81+
return this.defaultErrorHandler.isAckAfterHandle();
82+
}
83+
84+
@Override
85+
public void setAckAfterHandle(boolean ack) {
86+
this.defaultErrorHandler.setAckAfterHandle(ack);
87+
}
88+
89+
/**
90+
* Add a delegate to the end of the current collection.
91+
* @param throwable the throwable for this handler.
92+
* @param handler the handler.
93+
*/
94+
public void addDelegate(Class<? extends Throwable> throwable, CommonErrorHandler handler) {
95+
this.delegates.put(throwable, handler);
96+
checkDelegates();
97+
}
98+
99+
private void checkDelegates() {
100+
boolean remainingRecords = this.defaultErrorHandler.remainingRecords();
101+
boolean ackAfterHandle = this.defaultErrorHandler.isAckAfterHandle();
102+
this.delegates.values().forEach(handler -> {
103+
Assert.isTrue(remainingRecords == handler.remainingRecords(),
104+
"All delegates must return the same value when calling 'remainingRecords()'");
105+
Assert.isTrue(ackAfterHandle == handler.isAckAfterHandle(),
106+
"All delegates must return the same value when calling 'isAckAfterHandle()'");
107+
});
108+
}
109+
110+
@Override
111+
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,
112+
Consumer<?, ?> consumer, MessageListenerContainer container) {
113+
114+
CommonErrorHandler handler = findDelegate(thrownException);
115+
if (handler != null) {
116+
handler.handleRemaining(thrownException, records, consumer, container);
117+
}
118+
else {
119+
this.defaultErrorHandler.handleRemaining(thrownException, records, consumer, container);
120+
}
121+
}
122+
123+
@Override
124+
public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
125+
MessageListenerContainer container, Runnable invokeListener) {
126+
127+
CommonErrorHandler handler = findDelegate(thrownException);
128+
if (handler != null) {
129+
handler.handleBatch(thrownException, data, consumer, container, invokeListener);
130+
}
131+
else {
132+
this.defaultErrorHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
133+
}
134+
}
135+
136+
@Override
137+
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
138+
MessageListenerContainer container) {
139+
140+
CommonErrorHandler handler = findDelegate(thrownException);
141+
if (handler != null) {
142+
handler.handleOtherException(thrownException, consumer, container);
143+
}
144+
else {
145+
this.defaultErrorHandler.handleOtherException(thrownException, consumer, container);
146+
}
147+
}
148+
149+
@Nullable
150+
private CommonErrorHandler findDelegate(Throwable thrownException) {
151+
Throwable cause = thrownException;
152+
if (cause instanceof ListenerExecutionFailedException) {
153+
cause = thrownException.getCause();
154+
}
155+
if (cause != null) {
156+
Class<? extends Throwable> causeClass = cause.getClass();
157+
for (Entry<Class<? extends Throwable>, CommonErrorHandler> entry : this.delegates.entrySet()) {
158+
if (entry.getKey().isAssignableFrom(causeClass)) {
159+
return entry.getValue();
160+
}
161+
}
162+
}
163+
return null;
164+
}
165+
166+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ConditionalDelegatingBatchErrorHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,13 @@
3030
* An error handler that delegates to different error handlers, depending on the exception
3131
* type.
3232
*
33+
* @deprecated in favor of {@link CommonDelegatingErrorHandler}.
34+
*
3335
* @author Gary Russell
3436
* @since 2.7.4
3537
*
3638
*/
39+
@Deprecated
3740
public class ConditionalDelegatingBatchErrorHandler implements ListenerInvokingBatchErrorHandler {
3841

3942
private final ContainerAwareBatchErrorHandler defaultErrorHandler;

spring-kafka/src/main/java/org/springframework/kafka/listener/ConditionalDelegatingErrorHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,13 @@
3131
* An error handler that delegates to different error handlers, depending on the exception
3232
* type.
3333
*
34+
* @deprecated in favor of {@link CommonDelegatingErrorHandler}.
35+
*
3436
* @author Gary Russell
3537
* @since 2.7.4
3638
*
3739
*/
40+
@Deprecated
3841
public class ConditionalDelegatingErrorHandler implements ContainerAwareErrorHandler {
3942

4043
private final ContainerAwareErrorHandler defaultErrorHandler;
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.List;
20+
21+
import org.apache.kafka.clients.consumer.Consumer;
22+
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
import org.apache.kafka.clients.consumer.ConsumerRecords;
24+
import org.apache.kafka.common.errors.SerializationException;
25+
26+
import org.springframework.lang.Nullable;
27+
import org.springframework.util.backoff.BackOff;
28+
29+
/**
30+
* An error handler that, for record listeners, seeks to the current offset for each topic
31+
* in the remaining records. Used to rewind partitions after a message failure so that it
32+
* can be replayed. For batch listeners, seeks to the current offset for each topic in a
33+
* batch of records. Used to rewind partitions after a message failure so that the batch
34+
* can be replayed. If the listener throws a {@link BatchListenerFailedException}, with
35+
* the failed record. The records before the record will have their offsets committed and
36+
* the partitions for the remaining records will be repositioned and/or the failed record
37+
* can be recovered and skipped. If some other exception is thrown, or a valid record is
38+
* not provided in the exception, error handling is delegated to a
39+
* {@link RetryingBatchErrorHandler} with this handler's {@link BackOff}. If the record is
40+
* recovered, its offset is committed. This is a replacement for the legacy
41+
* {@link SeekToCurrentErrorHandler} and {@link SeekToCurrentBatchErrorHandler} (but the
42+
* fallback now can send the messages to a recoverer after retries are completed instead
43+
* of retring indefinitely).
44+
*
45+
* @author Gary Russell
46+
*
47+
* @since 2.8
48+
*
49+
*/
50+
public class DefaultErrorHandler extends FailedBatchProcessor implements CommonErrorHandler {
51+
52+
private boolean ackAfterHandle = true;
53+
54+
/**
55+
* Construct an instance with the default recoverer which simply logs the record after
56+
* {@value SeekUtils#DEFAULT_MAX_FAILURES} (maxFailures) have occurred for a
57+
* topic/partition/offset, with the default back off (9 retries, no delay).
58+
*/
59+
public DefaultErrorHandler() {
60+
this(null, SeekUtils.DEFAULT_BACK_OFF);
61+
}
62+
63+
/**
64+
* Construct an instance with the default recoverer which simply logs the record after
65+
* the backOff returns STOP for a topic/partition/offset.
66+
* @param backOff the {@link BackOff}.
67+
*/
68+
public DefaultErrorHandler(BackOff backOff) {
69+
this(null, backOff);
70+
}
71+
72+
/**
73+
* Construct an instance with the provided recoverer which will be called after
74+
* {@value SeekUtils#DEFAULT_MAX_FAILURES} (maxFailures) have occurred for a
75+
* topic/partition/offset.
76+
* @param recoverer the recoverer.
77+
*/
78+
public DefaultErrorHandler(ConsumerRecordRecoverer recoverer) {
79+
this(recoverer, SeekUtils.DEFAULT_BACK_OFF);
80+
}
81+
82+
/**
83+
* Construct an instance with the provided recoverer which will be called after
84+
* the backOff returns STOP for a topic/partition/offset.
85+
* @param recoverer the recoverer; if null, the default (logging) recoverer is used.
86+
* @param backOff the {@link BackOff}.
87+
*/
88+
public DefaultErrorHandler(@Nullable ConsumerRecordRecoverer recoverer, BackOff backOff) {
89+
super(recoverer, backOff, createFallback(backOff, recoverer));
90+
}
91+
92+
private static CommonErrorHandler createFallback(BackOff backOff, ConsumerRecordRecoverer recoverer) {
93+
return new ErrorHandlerAdapter(new RetryingBatchErrorHandler(backOff, recoverer));
94+
}
95+
96+
/**
97+
* {@inheritDoc}
98+
* The container must be configured with
99+
* {@link org.springframework.kafka.listener.ContainerProperties.AckMode#MANUAL_IMMEDIATE}.
100+
* Whether or not the commit is sync or async depends on the container's syncCommits
101+
* property.
102+
* @param commitRecovered true to commit.
103+
*/
104+
@Override
105+
public void setCommitRecovered(boolean commitRecovered) { // NOSONAR enhanced javadoc
106+
super.setCommitRecovered(commitRecovered);
107+
}
108+
109+
@Override
110+
public boolean isAckAfterHandle() {
111+
return this.ackAfterHandle;
112+
}
113+
114+
@Override
115+
public void setAckAfterHandle(boolean ackAfterHandle) {
116+
this.ackAfterHandle = ackAfterHandle;
117+
}
118+
119+
@Override
120+
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,
121+
Consumer<?, ?> consumer, MessageListenerContainer container) {
122+
123+
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
124+
getRecoveryStrategy(records, thrownException), this.logger, getLogLevel());
125+
}
126+
127+
@Override
128+
public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
129+
MessageListenerContainer container, Runnable invokeListener) {
130+
131+
doHandle(thrownException, data, consumer, container, invokeListener);
132+
}
133+
134+
@Override
135+
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
136+
MessageListenerContainer container) {
137+
138+
if (thrownException instanceof SerializationException) {
139+
throw new IllegalStateException("This error handler cannot process 'SerializationException's directly; "
140+
+ "please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key "
141+
+ "deserializer", thrownException);
142+
}
143+
else {
144+
throw new IllegalStateException("This error handler cannot process '"
145+
+ thrownException.getClass().getName()
146+
+ "'s; no record information is available", thrownException);
147+
}
148+
}
149+
150+
}

0 commit comments

Comments
 (0)