Skip to content

Commit 6a24685

Browse files
breader124garyrussell
authored andcommitted
GH-2288: Delegating EH - Traverse Causes
Resolves #2288 GH-2288: Add option to deeply traverse exc chain Add possibility to deeply traverse cause chain in order to find proper delegate for handling thrown exception. Keep old way of cause chain traversing as default one. Cover new code with unit test. GH-2288: Make cause traversing more extensible Use BinaryExceptionClassifier while traversing cause chain to make it possible to classify throwables for handling based on inheritance etc. GH-2288: Remove custom BinaryExceptionClassifier Polish Javadocs.
1 parent 37910b6 commit 6a24685

File tree

2 files changed

+139
-8
lines changed

2 files changed

+139
-8
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java

+57-5
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,18 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.HashMap;
1920
import java.util.LinkedHashMap;
2021
import java.util.List;
2122
import java.util.Map;
2223
import java.util.Map.Entry;
24+
import java.util.stream.Collectors;
2325

2426
import org.apache.kafka.clients.consumer.Consumer;
2527
import org.apache.kafka.clients.consumer.ConsumerRecord;
2628
import org.apache.kafka.clients.consumer.ConsumerRecords;
2729

30+
import org.springframework.classify.BinaryExceptionClassifier;
2831
import org.springframework.lang.Nullable;
2932
import org.springframework.util.Assert;
3033

@@ -34,6 +37,7 @@
3437
* {@link #deliveryAttemptHeader()} is not supported - always returns false.
3538
*
3639
* @author Gary Russell
40+
* @author Adrian Chlebosz
3741
* @since 2.8
3842
*
3943
*/
@@ -43,6 +47,10 @@ public class CommonDelegatingErrorHandler implements CommonErrorHandler {
4347

4448
private final Map<Class<? extends Throwable>, CommonErrorHandler> delegates = new LinkedHashMap<>();
4549

50+
private boolean causeChainTraversing = false;
51+
52+
private BinaryExceptionClassifier classifier = new BinaryExceptionClassifier(new HashMap<>());
53+
4654
/**
4755
* Construct an instance with a default error handler that will be invoked if the
4856
* exception has no matches.
@@ -59,11 +67,30 @@ public CommonDelegatingErrorHandler(CommonErrorHandler defaultErrorHandler) {
5967
* @param delegates the delegates.
6068
*/
6169
public void setErrorHandlers(Map<Class<? extends Throwable>, CommonErrorHandler> delegates) {
70+
Assert.notNull(delegates, "'delegates' cannot be null");
6271
this.delegates.clear();
6372
this.delegates.putAll(delegates);
6473
checkDelegates();
74+
updateClassifier(delegates);
75+
}
76+
77+
private void updateClassifier(Map<Class<? extends Throwable>, CommonErrorHandler> delegates) {
78+
Map<Class<? extends Throwable>, Boolean> classifications = delegates.keySet().stream()
79+
.map(commonErrorHandler -> Map.entry(commonErrorHandler, true))
80+
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
81+
this.classifier = new BinaryExceptionClassifier(classifications);
6582
}
6683

84+
/**
85+
* Set the flag enabling deep exception's cause chain traversing. If true, the
86+
* delegate for the first exception classified by {@link BinaryExceptionClassifier}
87+
* will be retrieved.
88+
* @param causeChainTraversing the causeChainTraversing flag.
89+
* @since 2.8.8
90+
*/
91+
public void setCauseChainTraversing(boolean causeChainTraversing) {
92+
this.causeChainTraversing = causeChainTraversing;
93+
}
6794

6895
@SuppressWarnings("deprecation")
6996
@Override
@@ -79,7 +106,7 @@ public boolean seeksAfterHandling() {
79106
@Override
80107
public void clearThreadState() {
81108
this.defaultErrorHandler.clearThreadState();
82-
this.delegates.values().forEach(handler -> handler.clearThreadState());
109+
this.delegates.values().forEach(CommonErrorHandler::clearThreadState);
83110
}
84111

85112
@Override
@@ -158,10 +185,7 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu
158185

159186
@Nullable
160187
private CommonErrorHandler findDelegate(Throwable thrownException) {
161-
Throwable cause = thrownException;
162-
if (cause instanceof ListenerExecutionFailedException) {
163-
cause = thrownException.getCause();
164-
}
188+
Throwable cause = findCause(thrownException);
165189
if (cause != null) {
166190
Class<? extends Throwable> causeClass = cause.getClass();
167191
for (Entry<Class<? extends Throwable>, CommonErrorHandler> entry : this.delegates.entrySet()) {
@@ -173,4 +197,32 @@ private CommonErrorHandler findDelegate(Throwable thrownException) {
173197
return null;
174198
}
175199

200+
@Nullable
201+
private Throwable findCause(Throwable thrownException) {
202+
if (this.causeChainTraversing) {
203+
return traverseCauseChain(thrownException);
204+
}
205+
return shallowTraverseCauseChain(thrownException);
206+
}
207+
208+
@Nullable
209+
private Throwable shallowTraverseCauseChain(Throwable thrownException) {
210+
Throwable cause = thrownException;
211+
if (cause instanceof ListenerExecutionFailedException) {
212+
cause = thrownException.getCause();
213+
}
214+
return cause;
215+
}
216+
217+
@Nullable
218+
private Throwable traverseCauseChain(Throwable thrownException) {
219+
while (thrownException != null && thrownException.getCause() != null) {
220+
if (this.classifier.classify(thrownException)) { // NOSONAR using Boolean here is not dangerous
221+
return thrownException;
222+
}
223+
thrownException = thrownException.getCause();
224+
}
225+
return thrownException;
226+
}
227+
176228
}

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/CommonDelegatingErrorHandlerTests.java

+82-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import static org.mockito.ArgumentMatchers.any;
2020
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.never;
2122
import static org.mockito.Mockito.verify;
2223

2324
import java.io.IOException;
@@ -29,12 +30,13 @@
2930
import org.junit.jupiter.api.Test;
3031

3132
import org.springframework.kafka.KafkaException;
33+
import org.springframework.kafka.core.KafkaProducerException;
3234

3335
/**
34-
* Tests for {@link CommonDelegatingErrorHandler}. Copied from
35-
* {@link ConditionalDelegatingErrorHandlerTests} with changed handler type.
36+
* Tests for {@link CommonDelegatingErrorHandler}.
3637
*
3738
* @author Gary Russell
39+
* @author Adrian Chlebosz
3840
* @since 2.8
3941
*
4042
*/
@@ -88,6 +90,83 @@ void testBatchDelegates() {
8890
verify(one).handleBatch(any(), any(), any(), any(), any());
8991
}
9092

93+
@Test
94+
void testDelegateForThrowableIsAppliedWhenCauseTraversingIsEnabled() {
95+
var defaultHandler = mock(CommonErrorHandler.class);
96+
97+
var directCauseErrorHandler = mock(CommonErrorHandler.class);
98+
var directCauseExc = new IllegalArgumentException();
99+
var errorHandler = mock(CommonErrorHandler.class);
100+
var exc = new UnsupportedOperationException(directCauseExc);
101+
102+
var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler);
103+
delegatingErrorHandler.setCauseChainTraversing(true);
104+
delegatingErrorHandler.setErrorHandlers(Map.of(
105+
exc.getClass(), errorHandler,
106+
directCauseExc.getClass(), directCauseErrorHandler
107+
));
108+
109+
delegatingErrorHandler.handleRemaining(directCauseExc, Collections.emptyList(), mock(Consumer.class),
110+
mock(MessageListenerContainer.class));
111+
112+
verify(directCauseErrorHandler).handleRemaining(any(), any(), any(), any());
113+
verify(errorHandler, never()).handleRemaining(any(), any(), any(), any());
114+
}
115+
116+
@Test
117+
void testDelegateForThrowableCauseIsAppliedWhenCauseTraversingIsEnabled() {
118+
var defaultHandler = mock(CommonErrorHandler.class);
119+
120+
var directCauseErrorHandler = mock(CommonErrorHandler.class);
121+
var directCauseExc = new IllegalArgumentException();
122+
var exc = new UnsupportedOperationException(directCauseExc);
123+
124+
var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler);
125+
delegatingErrorHandler.setCauseChainTraversing(true);
126+
delegatingErrorHandler.setErrorHandlers(Map.of(
127+
directCauseExc.getClass(), directCauseErrorHandler
128+
));
129+
130+
delegatingErrorHandler.handleRemaining(exc, Collections.emptyList(), mock(Consumer.class),
131+
mock(MessageListenerContainer.class));
132+
133+
verify(directCauseErrorHandler).handleRemaining(any(), any(), any(), any());
134+
}
135+
136+
@Test
137+
@SuppressWarnings("ConstantConditions")
138+
void testDelegateForClassifiableThrowableCauseIsAppliedWhenCauseTraversingIsEnabled() {
139+
var defaultHandler = mock(CommonErrorHandler.class);
140+
141+
var directCauseErrorHandler = mock(CommonErrorHandler.class);
142+
var directCauseExc = new KafkaProducerException(null, null, null);
143+
var exc = new UnsupportedOperationException(directCauseExc);
144+
145+
var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler);
146+
delegatingErrorHandler.setCauseChainTraversing(true);
147+
delegatingErrorHandler.setErrorHandlers(Map.of(
148+
KafkaException.class, directCauseErrorHandler
149+
));
150+
151+
delegatingErrorHandler.handleRemaining(exc, Collections.emptyList(), mock(Consumer.class),
152+
mock(MessageListenerContainer.class));
153+
154+
verify(directCauseErrorHandler).handleRemaining(any(), any(), any(), any());
155+
}
156+
157+
@Test
158+
@SuppressWarnings("ConstantConditions")
159+
void testDefaultDelegateIsApplied() {
160+
var defaultHandler = mock(CommonErrorHandler.class);
161+
var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler);
162+
delegatingErrorHandler.setCauseChainTraversing(true);
163+
164+
delegatingErrorHandler.handleRemaining(null, Collections.emptyList(), mock(Consumer.class),
165+
mock(MessageListenerContainer.class));
166+
167+
verify(defaultHandler).handleRemaining(any(), any(), any(), any());
168+
}
169+
91170
private Exception wrap(Exception ex) {
92171
return new ListenerExecutionFailedException("test", ex);
93172
}

0 commit comments

Comments
 (0)