Skip to content

Commit b0be198

Browse files
garyrussellartembilan
authored andcommitted
spring-projectsGH-614: Fix general consumer error handling
Fixes spring-projects#614 PR spring-projects#595 added support for calling error handlers for general errors not related to listener invocation. However, the wrong `handle()` method was called. Add a default implementation of the lowest interface method in the hierarchies to `ErrorHandler` and `BatchErrorHandler` respectively and invoke that so the right method will always be invoked, regardless of the error handler type. Also see spring-projects#615
1 parent 5c78c30 commit b0be198

9 files changed

+83
-40
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/BatchErrorHandler.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2016 the original author or authors.
2+
* Copyright 2015-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import org.apache.kafka.clients.consumer.Consumer;
1920
import org.apache.kafka.clients.consumer.ConsumerRecords;
2021

2122
/**
@@ -29,4 +30,16 @@
2930
*/
3031
public interface BatchErrorHandler extends GenericErrorHandler<ConsumerRecords<?, ?>> {
3132

33+
/**
34+
* Handle the exception.
35+
* @param thrownException the exception.
36+
* @param data the consumer records.
37+
* @param consumer the consumer.
38+
* @param container the container.
39+
*/
40+
default void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
41+
MessageListenerContainer container) {
42+
handle(thrownException, data);
43+
}
44+
3245
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareBatchErrorHandler.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,4 +38,10 @@ default void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
3838
@Override
3939
void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer);
4040

41+
@Override
42+
default void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
43+
MessageListenerContainer container) {
44+
handle(thrownException, data, consumer);
45+
}
46+
4147
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareErrorHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.List;
20+
1921
import org.apache.kafka.clients.consumer.Consumer;
2022
import org.apache.kafka.clients.consumer.ConsumerRecord;
2123

@@ -38,4 +40,10 @@ default void handle(Exception thrownException, ConsumerRecord<?, ?> data) {
3840
@Override
3941
void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer);
4042

43+
@Override
44+
default void handle(Exception thrownException, List<ConsumerRecord<?, ?>> data, Consumer<?, ?> consumer,
45+
MessageListenerContainer container) {
46+
handle(thrownException, null, consumer);
47+
}
48+
4149
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerAwareBatchErrorHandler.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,13 +35,7 @@ default void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consu
3535
throw new UnsupportedOperationException("Container should never call this");
3636
}
3737

38-
/**
39-
* Handle the exception.
40-
* @param thrownException the exception.
41-
* @param data the consumer records.
42-
* @param consumer the consumer.
43-
* @param container the container.
44-
*/
38+
@Override
4539
void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
4640
MessageListenerContainer container);
4741

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerAwareErrorHandler.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,13 +39,7 @@ default void handle(Exception thrownException, List<ConsumerRecord<?, ?>> record
3939
throw new UnsupportedOperationException("Container should never call this");
4040
}
4141

42-
/**
43-
* Handle the exception.
44-
* @param thrownException the exception.
45-
* @param records the remaining records including the one that failed.
46-
* @param consumer the consumer.
47-
* @param container the container.
48-
*/
42+
@Override
4943
void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
5044
MessageListenerContainer container);
5145

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandler.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2016 the original author or authors.
2+
* Copyright 2015-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,9 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.List;
20+
21+
import org.apache.kafka.clients.consumer.Consumer;
1922
import org.apache.kafka.clients.consumer.ConsumerRecord;
2023

2124
/**
@@ -26,4 +29,16 @@
2629
*/
2730
public interface ErrorHandler extends GenericErrorHandler<ConsumerRecord<?, ?>> {
2831

32+
/**
33+
* Handle the exception.
34+
* @param thrownException the exception.
35+
* @param records the remaining records including the one that failed.
36+
* @param consumer the consumer.
37+
* @param container the container.
38+
*/
39+
default void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
40+
MessageListenerContainer container) {
41+
handle(thrownException, null);
42+
}
43+
2944
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -744,24 +744,7 @@ public void run() {
744744
break;
745745
}
746746
catch (Exception e) {
747-
try {
748-
GenericErrorHandler<?> containerErrorHandler = this.containerProperties.getGenericErrorHandler();
749-
if (containerErrorHandler != null) {
750-
if (containerErrorHandler instanceof ConsumerAwareErrorHandler
751-
|| containerErrorHandler instanceof ConsumerAwareBatchErrorHandler) {
752-
containerErrorHandler.handle(e, null, this.consumer);
753-
}
754-
else {
755-
containerErrorHandler.handle(e, null);
756-
}
757-
}
758-
else {
759-
this.logger.error("Container exception", e);
760-
}
761-
}
762-
catch (Exception ex) {
763-
this.logger.error("Container exception", ex);
764-
}
747+
handleConsumerException(e);
765748
}
766749
}
767750
ProducerFactoryUtils.clearConsumerGroupId();
@@ -788,6 +771,30 @@ public void run() {
788771
this.logger.info("Consumer stopped");
789772
}
790773

774+
/**
775+
* Handle exceptions thrown by the consumer outside of message listener
776+
* invocation (e.g. commit exceptions).
777+
* @param e the exception.
778+
*/
779+
protected void handleConsumerException(Exception e) {
780+
try {
781+
if (this.errorHandler != null) {
782+
this.errorHandler.handle(e, Collections.emptyList(), this.consumer,
783+
KafkaMessageListenerContainer.this);
784+
}
785+
else if (this.batchErrorHandler != null) {
786+
this.batchErrorHandler.handle(e, new ConsumerRecords<K, V>(Collections.emptyMap()), this.consumer,
787+
KafkaMessageListenerContainer.this);
788+
}
789+
else {
790+
this.logger.error("Consumer exception", e);
791+
}
792+
}
793+
catch (Exception ex) {
794+
this.logger.error("Consumer exception", ex);
795+
}
796+
}
797+
791798
private void commitPendingAcks() {
792799
processCommits();
793800
if (this.offsets.size() > 0) {

spring-kafka/src/main/java/org/springframework/kafka/listener/RemainingRecordsErrorHandler.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -47,4 +47,10 @@ default void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consum
4747
*/
4848
void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer);
4949

50+
@Override
51+
default void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
52+
MessageListenerContainer container) {
53+
handle(thrownException, records, consumer);
54+
}
55+
5056
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1747,7 +1747,7 @@ public void testExceptionWhenCommitAfterRebalance() throws Exception {
17471747
Thread.sleep(3000);
17481748
}
17491749
catch (InterruptedException e) {
1750-
e.printStackTrace();
1750+
Thread.currentThread().interrupt();
17511751
}
17521752
});
17531753
containerProps.setSyncCommits(true);

0 commit comments

Comments
 (0)