Skip to content

Commit 0782b1c

Browse files
committed
spring-projectsGH-2288: Add option to deeply traverse exc chain
Fixes spring-projects#2288 Add possibility to deeply traverse cause chain in order to find proper delegate for handling thrown exception. Keep old way of cause chain traversing as default one. Cover new code with unit test.
1 parent 70a7274 commit 0782b1c

File tree

2 files changed

+83
-8
lines changed

2 files changed

+83
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java

+39-5
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
* {@link #deliveryAttemptHeader()} is not supported - always returns false.
3535
*
3636
* @author Gary Russell
37+
* @author Adrian Chlebosz
3738
* @since 2.8
3839
*
3940
*/
@@ -43,6 +44,8 @@ public class CommonDelegatingErrorHandler implements CommonErrorHandler {
4344

4445
private final Map<Class<? extends Throwable>, CommonErrorHandler> delegates = new LinkedHashMap<>();
4546

47+
private boolean deepCauseChainTraversing = false;
48+
4649
/**
4750
* Construct an instance with a default error handler that will be invoked if the
4851
* exception has no matches.
@@ -64,6 +67,15 @@ public void setErrorHandlers(Map<Class<? extends Throwable>, CommonErrorHandler>
6467
checkDelegates();
6568
}
6669

70+
/**
71+
* Set the flag enabling deep exception's cause chain traversing. If true, delegate
72+
* for initial {@link Throwable} will be retrieved. Initial exception is an exception
73+
* without cause or with unknown cause.
74+
* @param deepCauseChainTraversing the deepCauseChainTraversing flag.
75+
*/
76+
public void setDeepCauseChainTraversing(boolean deepCauseChainTraversing) {
77+
this.deepCauseChainTraversing = deepCauseChainTraversing;
78+
}
6779

6880
@SuppressWarnings("deprecation")
6981
@Override
@@ -79,7 +91,7 @@ public boolean seeksAfterHandling() {
7991
@Override
8092
public void clearThreadState() {
8193
this.defaultErrorHandler.clearThreadState();
82-
this.delegates.values().forEach(handler -> handler.clearThreadState());
94+
this.delegates.values().forEach(CommonErrorHandler::clearThreadState);
8395
}
8496

8597
@Override
@@ -158,10 +170,7 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu
158170

159171
@Nullable
160172
private CommonErrorHandler findDelegate(Throwable thrownException) {
161-
Throwable cause = thrownException;
162-
if (cause instanceof ListenerExecutionFailedException) {
163-
cause = thrownException.getCause();
164-
}
173+
Throwable cause = findCause(thrownException);
165174
if (cause != null) {
166175
Class<? extends Throwable> causeClass = cause.getClass();
167176
for (Entry<Class<? extends Throwable>, CommonErrorHandler> entry : this.delegates.entrySet()) {
@@ -173,4 +182,29 @@ private CommonErrorHandler findDelegate(Throwable thrownException) {
173182
return null;
174183
}
175184

185+
@Nullable
186+
private Throwable findCause(Throwable thrownException) {
187+
if (this.deepCauseChainTraversing) {
188+
return deepTraverseCauseChain(thrownException);
189+
}
190+
return shallowTraverseCauseChain(thrownException);
191+
}
192+
193+
@Nullable
194+
private Throwable shallowTraverseCauseChain(Throwable thrownException) {
195+
Throwable cause = thrownException;
196+
if (cause instanceof ListenerExecutionFailedException) {
197+
cause = thrownException.getCause();
198+
}
199+
return cause;
200+
}
201+
202+
@Nullable
203+
private Throwable deepTraverseCauseChain(Throwable thrownException) {
204+
while (thrownException != null && thrownException.getCause() != null) {
205+
thrownException = thrownException.getCause();
206+
}
207+
return thrownException;
208+
}
209+
176210
}

spring-kafka/src/test/java/org/springframework/kafka/listener/CommonDelegatingErrorHandlerTests.java

+44-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import static org.mockito.ArgumentMatchers.any;
2020
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.never;
2122
import static org.mockito.Mockito.verify;
2223

2324
import java.io.IOException;
@@ -31,10 +32,10 @@
3132
import org.springframework.kafka.KafkaException;
3233

3334
/**
34-
* Tests for {@link CommonDelegatingErrorHandler}. Copied from
35-
* {@link ConditionalDelegatingErrorHandlerTests} with changed handler type.
35+
* Tests for {@link CommonDelegatingErrorHandler}.
3636
*
3737
* @author Gary Russell
38+
* @author Adrian Chlebosz
3839
* @since 2.8
3940
*
4041
*/
@@ -88,6 +89,46 @@ void testBatchDelegates() {
8889
verify(one).handleBatch(any(), any(), any(), any(), any());
8990
}
9091

92+
@Test
93+
void testDelegateForInitialCauseOfThrowableIsAppliedWhenDeepTraversingFlagIsSet() {
94+
var defaultHandler = mock(CommonErrorHandler.class);
95+
96+
var initErrorHandler = mock(CommonErrorHandler.class);
97+
var initExc = new IllegalStateException();
98+
var directCauseErrorHandler = mock(CommonErrorHandler.class);
99+
var directCauseExc = new IllegalArgumentException(initExc);
100+
var errorHandler = mock(CommonErrorHandler.class);
101+
var exc = new UnsupportedOperationException(directCauseExc);
102+
103+
var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler);
104+
delegatingErrorHandler.setDeepCauseChainTraversing(true);
105+
delegatingErrorHandler.setErrorHandlers(Map.of(
106+
initExc.getClass(), initErrorHandler,
107+
directCauseExc.getClass(), directCauseErrorHandler,
108+
exc.getClass(), errorHandler
109+
));
110+
111+
delegatingErrorHandler.handleRemaining(exc, Collections.emptyList(), mock(Consumer.class),
112+
mock(MessageListenerContainer.class));
113+
114+
verify(initErrorHandler).handleRemaining(any(), any(), any(), any());
115+
verify(directCauseErrorHandler, never()).handleRemaining(any(), any(), any(), any());
116+
verify(errorHandler, never()).handleRemaining(any(), any(), any(), any());
117+
}
118+
119+
@Test
120+
@SuppressWarnings("ConstantConditions")
121+
void testDefaultDelegateIsAppliedWhenDeepTraversingFlagIsSetAndNullThrowableHasBeenPassed() {
122+
var defaultHandler = mock(CommonErrorHandler.class);
123+
var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler);
124+
delegatingErrorHandler.setDeepCauseChainTraversing(true);
125+
126+
delegatingErrorHandler.handleRemaining(null, Collections.emptyList(), mock(Consumer.class),
127+
mock(MessageListenerContainer.class));
128+
129+
verify(defaultHandler).handleRemaining(any(), any(), any(), any());
130+
}
131+
91132
private Exception wrap(Exception ex) {
92133
return new ListenerExecutionFailedException("test", ex);
93134
}

0 commit comments

Comments
 (0)