Skip to content

Commit fcaeb24

Browse files
committed
spring-projectsGH-2288: Make cause traversing more extensible
Use BinaryExceptionClassifier while traversing cause chain to make it possible to classify throwables for handling based on inheritance etc.
1 parent 0782b1c commit fcaeb24

File tree

2 files changed

+83
-20
lines changed

2 files changed

+83
-20
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java

+32-7
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,18 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.HashMap;
1920
import java.util.LinkedHashMap;
2021
import java.util.List;
2122
import java.util.Map;
2223
import java.util.Map.Entry;
24+
import java.util.stream.Collectors;
2325

2426
import org.apache.kafka.clients.consumer.Consumer;
2527
import org.apache.kafka.clients.consumer.ConsumerRecord;
2628
import org.apache.kafka.clients.consumer.ConsumerRecords;
2729

30+
import org.springframework.classify.BinaryExceptionClassifier;
2831
import org.springframework.lang.Nullable;
2932
import org.springframework.util.Assert;
3033

@@ -44,7 +47,8 @@ public class CommonDelegatingErrorHandler implements CommonErrorHandler {
4447

4548
private final Map<Class<? extends Throwable>, CommonErrorHandler> delegates = new LinkedHashMap<>();
4649

47-
private boolean deepCauseChainTraversing = false;
50+
private boolean causeChainTraversing = false;
51+
private NoTraversingBinaryExceptionClassifier classifier = new NoTraversingBinaryExceptionClassifier(new HashMap<>());
4852

4953
/**
5054
* Construct an instance with a default error handler that will be invoked if the
@@ -62,19 +66,28 @@ public CommonDelegatingErrorHandler(CommonErrorHandler defaultErrorHandler) {
6266
* @param delegates the delegates.
6367
*/
6468
public void setErrorHandlers(Map<Class<? extends Throwable>, CommonErrorHandler> delegates) {
69+
Assert.notNull(delegates, "'delegates' cannot be null");
6570
this.delegates.clear();
6671
this.delegates.putAll(delegates);
6772
checkDelegates();
73+
updateClassifier(delegates);
74+
}
75+
76+
private void updateClassifier(Map<Class<? extends Throwable>, CommonErrorHandler> delegates) {
77+
Map<Class<? extends Throwable>, Boolean> classifications = delegates.keySet().stream()
78+
.map(commonErrorHandler -> Map.entry(commonErrorHandler, true))
79+
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
80+
this.classifier = new NoTraversingBinaryExceptionClassifier(classifications);
6881
}
6982

7083
/**
7184
* Set the flag enabling deep exception's cause chain traversing. If true, delegate
7285
* for initial {@link Throwable} will be retrieved. Initial exception is an exception
7386
* without cause or with unknown cause.
74-
* @param deepCauseChainTraversing the deepCauseChainTraversing flag.
87+
* @param causeChainTraversing the deepCauseChainTraversing flag.
7588
*/
76-
public void setDeepCauseChainTraversing(boolean deepCauseChainTraversing) {
77-
this.deepCauseChainTraversing = deepCauseChainTraversing;
89+
public void setCauseChainTraversing(boolean causeChainTraversing) {
90+
this.causeChainTraversing = causeChainTraversing;
7891
}
7992

8093
@SuppressWarnings("deprecation")
@@ -184,8 +197,8 @@ private CommonErrorHandler findDelegate(Throwable thrownException) {
184197

185198
@Nullable
186199
private Throwable findCause(Throwable thrownException) {
187-
if (this.deepCauseChainTraversing) {
188-
return deepTraverseCauseChain(thrownException);
200+
if (this.causeChainTraversing) {
201+
return traverseCauseChain(thrownException);
189202
}
190203
return shallowTraverseCauseChain(thrownException);
191204
}
@@ -200,11 +213,23 @@ private Throwable shallowTraverseCauseChain(Throwable thrownException) {
200213
}
201214

202215
@Nullable
203-
private Throwable deepTraverseCauseChain(Throwable thrownException) {
216+
private Throwable traverseCauseChain(Throwable thrownException) {
204217
while (thrownException != null && thrownException.getCause() != null) {
218+
if (this.classifier.classify(thrownException)) { // NOSONAR using Boolean here is not dangerous
219+
return thrownException;
220+
}
205221
thrownException = thrownException.getCause();
206222
}
207223
return thrownException;
208224
}
209225

226+
@SuppressWarnings("serial")
227+
private static final class NoTraversingBinaryExceptionClassifier extends BinaryExceptionClassifier {
228+
229+
NoTraversingBinaryExceptionClassifier(Map<Class<? extends Throwable>, Boolean> typeMap) {
230+
super(typeMap, false, false);
231+
}
232+
233+
}
234+
210235
}

spring-kafka/src/test/java/org/springframework/kafka/listener/CommonDelegatingErrorHandlerTests.java

+51-13
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.junit.jupiter.api.Test;
3131

3232
import org.springframework.kafka.KafkaException;
33+
import org.springframework.kafka.core.KafkaProducerException;
3334

3435
/**
3536
* Tests for {@link CommonDelegatingErrorHandler}.
@@ -90,38 +91,75 @@ void testBatchDelegates() {
9091
}
9192

9293
@Test
93-
void testDelegateForInitialCauseOfThrowableIsAppliedWhenDeepTraversingFlagIsSet() {
94+
void testDelegateForThrowableIsAppliedWhenCauseTraversingIsEnabled() {
9495
var defaultHandler = mock(CommonErrorHandler.class);
9596

96-
var initErrorHandler = mock(CommonErrorHandler.class);
97-
var initExc = new IllegalStateException();
9897
var directCauseErrorHandler = mock(CommonErrorHandler.class);
99-
var directCauseExc = new IllegalArgumentException(initExc);
98+
var directCauseExc = new IllegalArgumentException();
10099
var errorHandler = mock(CommonErrorHandler.class);
101100
var exc = new UnsupportedOperationException(directCauseExc);
102101

103102
var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler);
104-
delegatingErrorHandler.setDeepCauseChainTraversing(true);
103+
delegatingErrorHandler.setCauseChainTraversing(true);
105104
delegatingErrorHandler.setErrorHandlers(Map.of(
106-
initExc.getClass(), initErrorHandler,
107-
directCauseExc.getClass(), directCauseErrorHandler,
108-
exc.getClass(), errorHandler
105+
exc.getClass(), errorHandler,
106+
directCauseExc.getClass(), directCauseErrorHandler
109107
));
110108

111-
delegatingErrorHandler.handleRemaining(exc, Collections.emptyList(), mock(Consumer.class),
109+
delegatingErrorHandler.handleRemaining(directCauseExc, Collections.emptyList(), mock(Consumer.class),
112110
mock(MessageListenerContainer.class));
113111

114-
verify(initErrorHandler).handleRemaining(any(), any(), any(), any());
115-
verify(directCauseErrorHandler, never()).handleRemaining(any(), any(), any(), any());
112+
verify(directCauseErrorHandler).handleRemaining(any(), any(), any(), any());
116113
verify(errorHandler, never()).handleRemaining(any(), any(), any(), any());
117114
}
118115

116+
@Test
117+
void testDelegateForThrowableCauseIsAppliedWhenCauseTraversingIsEnabled() {
118+
var defaultHandler = mock(CommonErrorHandler.class);
119+
120+
var directCauseErrorHandler = mock(CommonErrorHandler.class);
121+
var directCauseExc = new IllegalArgumentException();
122+
var exc = new UnsupportedOperationException(directCauseExc);
123+
124+
var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler);
125+
delegatingErrorHandler.setCauseChainTraversing(true);
126+
delegatingErrorHandler.setErrorHandlers(Map.of(
127+
directCauseExc.getClass(), directCauseErrorHandler
128+
));
129+
130+
delegatingErrorHandler.handleRemaining(exc, Collections.emptyList(), mock(Consumer.class),
131+
mock(MessageListenerContainer.class));
132+
133+
verify(directCauseErrorHandler).handleRemaining(any(), any(), any(), any());
134+
}
135+
136+
@Test
137+
@SuppressWarnings("ConstantConditions")
138+
void testDelegateForClassifiableThrowableCauseIsAppliedWhenCauseTraversingIsEnabled() {
139+
var defaultHandler = mock(CommonErrorHandler.class);
140+
141+
var directCauseErrorHandler = mock(CommonErrorHandler.class);
142+
var directCauseExc = new KafkaProducerException(null, null, null);
143+
var exc = new UnsupportedOperationException(directCauseExc);
144+
145+
var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler);
146+
delegatingErrorHandler.setCauseChainTraversing(true);
147+
delegatingErrorHandler.setErrorHandlers(Map.of(
148+
KafkaException.class, directCauseErrorHandler
149+
));
150+
151+
delegatingErrorHandler.handleRemaining(exc, Collections.emptyList(), mock(Consumer.class),
152+
mock(MessageListenerContainer.class));
153+
154+
verify(directCauseErrorHandler).handleRemaining(any(), any(), any(), any());
155+
}
156+
119157
@Test
120158
@SuppressWarnings("ConstantConditions")
121-
void testDefaultDelegateIsAppliedWhenDeepTraversingFlagIsSetAndNullThrowableHasBeenPassed() {
159+
void testDefaultDelegateIsApplied() {
122160
var defaultHandler = mock(CommonErrorHandler.class);
123161
var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler);
124-
delegatingErrorHandler.setDeepCauseChainTraversing(true);
162+
delegatingErrorHandler.setCauseChainTraversing(true);
125163

126164
delegatingErrorHandler.handleRemaining(null, Collections.emptyList(), mock(Consumer.class),
127165
mock(MessageListenerContainer.class));

0 commit comments

Comments
 (0)