Skip to content

Commit 6116966

Browse files
garyrussellartembilan
authored andcommitted
GH-667: Fix BatchErrorHandler for generic errors
Fixes #667 When catching exceptions outside of the listener (e.g. commit errors), the `BatchErrorHandler` was never called since the record error handler gets a logging handler. Check listener type before invoking the error handler for consumer errors.
1 parent 8c8a9ae commit 6116966

File tree

2 files changed

+50
-2
lines changed

2 files changed

+50
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -781,11 +781,11 @@ public void run() {
781781
*/
782782
protected void handleConsumerException(Exception e) {
783783
try {
784-
if (this.errorHandler != null) {
784+
if (!this.isBatchListener && this.errorHandler != null) {
785785
this.errorHandler.handle(e, Collections.emptyList(), this.consumer,
786786
KafkaMessageListenerContainer.this);
787787
}
788-
else if (this.batchErrorHandler != null) {
788+
else if (this.isBatchListener && this.batchErrorHandler != null) {
789789
this.batchErrorHandler.handle(e, new ConsumerRecords<K, V>(Collections.emptyMap()), this.consumer,
790790
KafkaMessageListenerContainer.this);
791791
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

+48
Original file line numberDiff line numberDiff line change
@@ -1967,6 +1967,54 @@ public void testAckModeCount() throws Exception {
19671967
container.stop();
19681968
}
19691969

1970+
@SuppressWarnings({ "unchecked", "rawtypes" })
1971+
@Test
1972+
public void testCommitErrorHandlerCalled() throws Exception {
1973+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
1974+
Consumer<Integer, String> consumer = mock(Consumer.class);
1975+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull())).willReturn(consumer);
1976+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
1977+
records.put(new TopicPartition("foo", 0), Arrays.asList(
1978+
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
1979+
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
1980+
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
1981+
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
1982+
AtomicBoolean first = new AtomicBoolean(true);
1983+
given(consumer.poll(anyLong())).willAnswer(i -> {
1984+
Thread.sleep(50);
1985+
return first.getAndSet(false) ? consumerRecords : emptyRecords;
1986+
});
1987+
willAnswer(i -> {
1988+
throw new RuntimeException("Commit failed");
1989+
}).given(consumer).commitSync(any(Map.class));
1990+
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
1991+
new TopicPartitionInitialOffset("foo", 0) };
1992+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
1993+
containerProps.setGroupId("grp");
1994+
containerProps.setClientId("clientId");
1995+
containerProps.setIdleEventInterval(100L);
1996+
containerProps.setMessageListener((MessageListener) r -> { });
1997+
KafkaMessageListenerContainer<Integer, String> container =
1998+
new KafkaMessageListenerContainer<>(cf, containerProps);
1999+
final CountDownLatch ehl = new CountDownLatch(1);
2000+
container.setErrorHandler((r, t) -> {
2001+
ehl.countDown();
2002+
});
2003+
container.start();
2004+
assertThat(ehl.await(10, TimeUnit.SECONDS)).isTrue();
2005+
container.stop();
2006+
containerProps.setMessageListener((BatchMessageListener) r -> { });
2007+
container = new KafkaMessageListenerContainer<>(cf, containerProps);
2008+
final CountDownLatch behl = new CountDownLatch(1);
2009+
container.setBatchErrorHandler((r, t) -> {
2010+
behl.countDown();
2011+
});
2012+
first.set(true);
2013+
container.start();
2014+
assertThat(behl.await(10, TimeUnit.SECONDS)).isTrue();
2015+
container.stop();
2016+
}
2017+
19702018
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
19712019
Consumer<?, ?> consumer = spy(
19722020
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class));

0 commit comments

Comments
 (0)