Skip to content

Commit 4c180ef

Browse files
committed
spring-projectsGH-615: Add CommonLoggingErrorHandler
See spring-projects#615
1 parent cd661dc commit 4c180ef

12 files changed

+242
-23
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

+27-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.springframework.kafka.listener.AfterRollbackProcessor;
3838
import org.springframework.kafka.listener.BatchErrorHandler;
3939
import org.springframework.kafka.listener.BatchInterceptor;
40+
import org.springframework.kafka.listener.CommonErrorHandler;
4041
import org.springframework.kafka.listener.ContainerProperties;
4142
import org.springframework.kafka.listener.ErrorHandler;
4243
import org.springframework.kafka.listener.GenericErrorHandler;
@@ -75,6 +76,8 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
7576

7677
private GenericErrorHandler<?> errorHandler;
7778

79+
private CommonErrorHandler commonErrorHandler;
80+
7881
private ConsumerFactory<? super K, ? super V> consumerFactory;
7982

8083
private Boolean autoStartup;
@@ -262,6 +265,16 @@ public void setBatchErrorHandler(BatchErrorHandler errorHandler) {
262265
this.errorHandler = errorHandler;
263266
}
264267

268+
/**
269+
* Set the {@link CommonErrorHandler} which can handle errors for both record
270+
* and batch listeners. Replaces the use of {@link GenericErrorHandler}s.
271+
* @param commonErrorHandler the handler.
272+
* @since 2.8
273+
*/
274+
public void setCommonErrorHandler(CommonErrorHandler commonErrorHandler) {
275+
this.commonErrorHandler = commonErrorHandler;
276+
}
277+
265278
/**
266279
* Set a processor to invoke after a transaction rollback; typically will
267280
* seek the unprocessed topic/partition to reprocess the records.
@@ -342,7 +355,19 @@ public void setContainerCustomizer(ContainerCustomizer<K, V, C> containerCustomi
342355

343356
@Override
344357
public void afterPropertiesSet() {
345-
if (this.errorHandler != null) {
358+
if (this.commonErrorHandler != null) {
359+
if (Boolean.TRUE.equals(this.batchListener)) {
360+
Assert.state(this.commonErrorHandler.supportsBatch(),
361+
() -> "The common error handler must support batch listeners, it is a " +
362+
this.commonErrorHandler.getClass().getName());
363+
}
364+
else {
365+
Assert.state(this.commonErrorHandler.supportsRecord(),
366+
() -> "The common error handler must support record listeners, it is a " +
367+
this.commonErrorHandler.getClass().getName());
368+
}
369+
}
370+
if (this.commonErrorHandler == null && this.errorHandler != null) {
346371
if (Boolean.TRUE.equals(this.batchListener)) {
347372
Assert.state(this.errorHandler instanceof BatchErrorHandler,
348373
() -> "The error handler must be a BatchErrorHandler, not " +
@@ -412,6 +437,7 @@ protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
412437
.acceptIfNotNull(this.containerProperties.getSubBatchPerPartition(),
413438
properties::setSubBatchPerPartition)
414439
.acceptIfNotNull(this.errorHandler, instance::setGenericErrorHandler)
440+
.acceptIfNotNull(this.commonErrorHandler, instance::setCommonErrorHandler)
415441
.acceptIfNotNull(this.missingTopicsFatal, instance.getContainerProperties()::setMissingTopicsFatal);
416442
Boolean autoStart = endpoint.getAutoStartup();
417443
if (autoStart != null) {

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

+22
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ public abstract class AbstractMessageListenerContainer<K, V>
8888

8989
private GenericErrorHandler<?> errorHandler;
9090

91+
private CommonErrorHandler commonErrorHandler;
92+
9193
private boolean autoStartup = true;
9294

9395
private int phase = DEFAULT_PHASE;
@@ -236,6 +238,26 @@ public GenericErrorHandler<?> getGenericErrorHandler() {
236238
return this.errorHandler;
237239
}
238240

241+
/**
242+
* Get the {@link CommonErrorHandler}.
243+
* @return the handler.
244+
* @since 2.8
245+
*/
246+
@Nullable
247+
public CommonErrorHandler getCommonErrorHandler() {
248+
return this.commonErrorHandler;
249+
}
250+
251+
/**
252+
* Set the {@link CommonErrorHandler} which can handle errors for both record
253+
* and batch listeners. Replaces the use of {@link GenericErrorHandler}s.
254+
* @param commonErrorHandler the handler.
255+
* @since 2.8
256+
*/
257+
public void setCommonErrorHandler(@Nullable CommonErrorHandler commonErrorHandler) {
258+
this.commonErrorHandler = commonErrorHandler;
259+
}
260+
239261
@Override
240262
public boolean isAutoStartup() {
241263
return this.autoStartup;

spring-kafka/src/main/java/org/springframework/kafka/listener/BatchLoggingErrorHandler.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,11 +25,14 @@
2525
import org.springframework.lang.Nullable;
2626

2727
/**
28-
* Simple handler that invokes a {@link LoggingErrorHandler} for each record.
28+
* Simple handler that logs each record.
29+
*
30+
* @deprecated - use the {@link CommonLoggingErrorHandler} instead.
2931
*
3032
* @author Gary Russell
3133
* @since 1.1
3234
*/
35+
@Deprecated
3336
public class BatchLoggingErrorHandler implements BatchErrorHandler {
3437

3538
private static final LogAccessor LOGGER =
@@ -43,7 +46,7 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> da
4346
}
4447
else {
4548
for (ConsumerRecord<?, ?> record : data) {
46-
message.append(record).append('\n');
49+
message.append(ListenerUtils.recordToString(record)).append('\n');
4750
}
4851
}
4952
LOGGER.error(thrownException, () -> message.substring(0, message.length() - 1));

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java

+40-2
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,50 @@
3434
*/
3535
public interface CommonErrorHandler extends DeliveryAttemptAware {
3636

37+
/**
38+
* The type of error handler.
39+
*/
40+
enum Type {
41+
42+
/**
43+
* Can handle errors for record listeners.
44+
*/
45+
RECORD,
46+
47+
/**
48+
* Can handle errors for batch listeners.
49+
*/
50+
BATCH,
51+
52+
/**
53+
* Can handle errors for both record and batch listeners.
54+
*/
55+
BOTH
56+
57+
}
58+
3759
/**
3860
* Return true if this error handler is for a batch listener.
3961
* @return true for batch.
4062
*/
41-
default boolean isBatch() {
42-
return false;
63+
default Type getType() {
64+
return Type.RECORD;
65+
}
66+
67+
/**
68+
* Return true if this handler supports record listeners.
69+
* @return true if record listeners are supported.
70+
*/
71+
default boolean supportsRecord() {
72+
return getType().equals(Type.RECORD) || getType().equals(Type.BOTH);
73+
}
74+
75+
/**
76+
* Return true if this handler supports batch listeners.
77+
* @return true if batch listeners are supported.
78+
*/
79+
default boolean supportsBatch() {
80+
return getType().equals(Type.BATCH) || getType().equals(Type.BOTH);
4381
}
4482

4583
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2015-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.apache.commons.logging.LogFactory;
20+
import org.apache.kafka.clients.consumer.Consumer;
21+
import org.apache.kafka.clients.consumer.ConsumerRecord;
22+
import org.apache.kafka.clients.consumer.ConsumerRecords;
23+
24+
import org.springframework.core.log.LogAccessor;
25+
26+
/**
27+
* The {@link CommonErrorHandler} implementation for logging exceptions.
28+
*
29+
* @author Gary Russell
30+
* @since 2.8
31+
*
32+
*/
33+
public class CommonLoggingErrorHandler implements CommonErrorHandler {
34+
35+
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(CommonLoggingErrorHandler.class));
36+
37+
@Override
38+
public Type getType() {
39+
return Type.BOTH;
40+
}
41+
42+
@Override
43+
public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
44+
MessageListenerContainer container) {
45+
46+
LOGGER.error(thrownException, () -> "Error occured while processing: " + ListenerUtils.recordToString(record));
47+
}
48+
49+
@Override
50+
public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
51+
MessageListenerContainer container, Runnable invokeListener) {
52+
53+
StringBuilder message = new StringBuilder("Error occurred while processing:\n");
54+
for (ConsumerRecord<?, ?> record : data) {
55+
message.append(ListenerUtils.recordToString(record)).append('\n');
56+
}
57+
LOGGER.error(thrownException, () -> message.substring(0, message.length() - 1));
58+
}
59+
60+
@Override
61+
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
62+
MessageListenerContainer container) {
63+
64+
LOGGER.error(thrownException, () -> "Error occurred while not processing records");
65+
}
66+
67+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

+1
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer<K,
221221
}
222222
container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + index : "");
223223
container.setGenericErrorHandler(getGenericErrorHandler());
224+
container.setCommonErrorHandler(getCommonErrorHandler());
224225
container.setAfterRollbackProcessor(getAfterRollbackProcessor());
225226
container.setRecordInterceptor(getRecordInterceptor());
226227
container.setBatchInterceptor(getBatchInterceptor());

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlerAdapter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ public ErrorHandlerAdapter(BatchErrorHandler batchErrorHandler) {
6363
}
6464

6565
@Override
66-
public boolean isBatch() {
67-
return this.batchErrorHandler != null;
66+
public Type getType() {
67+
return this.batchErrorHandler != null ? Type.BATCH : Type.RECORD;
6868
}
6969

7070
@Override

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -734,7 +734,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
734734
this.commitCurrentOnAssignment = determineCommitCurrent(consumerProperties,
735735
KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties());
736736
subscribeOrAssignTopics(this.consumer);
737-
GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler();
737+
GenericErrorHandler<?> errHandler = getGenericErrorHandler();
738738
if (listener instanceof BatchMessageListener) {
739739
this.listener = null;
740740
this.batchListener = (BatchMessageListener<K, V>) listener;
@@ -798,6 +798,15 @@ else if (listener instanceof MessageListener) {
798798

799799
@Nullable
800800
private CommonErrorHandler determineCommonErrorHandler(GenericErrorHandler<?> errHandler) {
801+
CommonErrorHandler common = getCommonErrorHandler();
802+
if (common != null) {
803+
if (errHandler != null) {
804+
this.logger.debug("GenericErrorHandler is ignored when a CommonErrorHandler is provided");
805+
}
806+
Assert.state(this.isBatchListener ? common.supportsBatch() : common.supportsRecord(),
807+
"Error handler is not compatible with the message listener");
808+
return common;
809+
}
801810
if (this.isBatchListener) {
802811
validateErrorHandler(true);
803812
BatchErrorHandler batchErrorHandler = determineBatchErrorHandler(errHandler);
@@ -1111,13 +1120,13 @@ protected void checkConsumer() {
11111120
}
11121121

11131122
@Nullable
1114-
protected BatchErrorHandler determineBatchErrorHandler(GenericErrorHandler<?> errHandler) {
1123+
protected BatchErrorHandler determineBatchErrorHandler(@Nullable GenericErrorHandler<?> errHandler) {
11151124
return errHandler != null ? (BatchErrorHandler) errHandler
11161125
: this.transactionManager != null ? null : new RecoveringBatchErrorHandler();
11171126
}
11181127

11191128
@Nullable
1120-
protected ErrorHandler determineErrorHandler(GenericErrorHandler<?> errHandler) {
1129+
protected ErrorHandler determineErrorHandler(@Nullable GenericErrorHandler<?> errHandler) {
11211130
return errHandler != null ? (ErrorHandler) errHandler
11221131
: this.transactionManager != null ? null : new SeekToCurrentErrorHandler();
11231132
}

spring-kafka/src/main/java/org/springframework/kafka/listener/LoggingErrorHandler.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,21 +21,23 @@
2121

2222
import org.springframework.core.log.LogAccessor;
2323
import org.springframework.lang.Nullable;
24-
import org.springframework.util.ObjectUtils;
2524

2625
/**
2726
* The {@link ErrorHandler} implementation for logging purpose.
2827
*
28+
* @deprecated - use the {@link CommonLoggingErrorHandler} instead.
29+
*
2930
* @author Marius Bogoevici
3031
* @author Gary Russell
3132
*/
33+
@Deprecated
3234
public class LoggingErrorHandler implements ErrorHandler {
3335

3436
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(LoggingErrorHandler.class));
3537

3638
@Override
3739
public void handle(Exception thrownException, @Nullable ConsumerRecord<?, ?> record) {
38-
LOGGER.error(thrownException, () -> "Error while processing: " + ObjectUtils.nullSafeToString(record));
40+
LOGGER.error(thrownException, () -> "Error while processing: " + ListenerUtils.recordToString(record));
3941
}
4042

4143
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

+5
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
import org.springframework.kafka.event.ListenerContainerIdleEvent;
9797
import org.springframework.kafka.event.ListenerContainerNoLongerIdleEvent;
9898
import org.springframework.kafka.listener.AbstractConsumerSeekAware;
99+
import org.springframework.kafka.listener.CommonLoggingErrorHandler;
99100
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
100101
import org.springframework.kafka.listener.ConsumerAwareErrorHandler;
101102
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
@@ -871,6 +872,9 @@ public void testKeyConversion() throws Exception {
871872
.collect(Collectors.toList()), e);
872873
throw e;
873874
}
875+
MessageListenerContainer container = this.registry.getListenerContainer("bytesKey");
876+
assertThat(KafkaTestUtils.getPropertyValue(container, "commonErrorHandler"))
877+
.isInstanceOf(CommonLoggingErrorHandler.class);
874878
}
875879

876880
@Test
@@ -1107,6 +1111,7 @@ public KafkaListenerContainerFactory<?> bytesStringListenerContainerFactory() {
11071111
this.intercepted = true;
11081112
return record;
11091113
});
1114+
factory.setCommonErrorHandler(new CommonLoggingErrorHandler());
11101115
return factory;
11111116
}
11121117

0 commit comments

Comments
 (0)