Skip to content

Commit b2ffc44

Browse files
committed
spring-projectsGH-615: Complete CommonErrorHandler Work
Resolves spring-projects#615 - Deprecate remaining legacy error handlers - Fix `DefaultErrorHandler` for remaining records and delivery header - Docs
1 parent 50196bf commit b2ffc44

25 files changed

+249
-377
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 141 additions & 273 deletions
Large diffs are not rendered by default.

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,9 @@ See <<ooo-commits>> for more information.
2020

2121
You can now receive a single record, given the topic, partition and offset.
2222
See <<kafka-template-receive>> for more information.
23+
24+
[[x28-eh]]
25+
==== `CommonErrorHandler` Added
26+
27+
The legacy `GenericErrorHandler` and its sub-interface hierarchies for record an batch listeners have been replaced by a new single interface `CommonErrorHandler` with implementations corresponding to most legacy implementations of `GenericErrorHandler`.
28+
See <<error-handlers>> for more information.

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,16 @@ public void setAckAfterHandle(boolean ackAfterHandle) {
116116
this.ackAfterHandle = ackAfterHandle;
117117
}
118118

119+
@Override
120+
public boolean remainingRecords() {
121+
return true;
122+
}
123+
124+
@Override
125+
public boolean deliveryAttemptHeader() {
126+
return true;
127+
}
128+
119129
@Override
120130
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,
121131
Consumer<?, ?> consumer, MessageListenerContainer container) {

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -810,7 +810,7 @@ private CommonErrorHandler determineCommonErrorHandler(@Nullable GenericErrorHan
810810
}
811811
if (this.isBatchListener) {
812812
validateErrorHandler(true);
813-
BatchErrorHandler batchErrorHandler = determineBatchErrorHandler(errHandler);
813+
BatchErrorHandler batchErrorHandler = (BatchErrorHandler) errHandler;
814814
if (batchErrorHandler != null) {
815815
return new ErrorHandlerAdapter(batchErrorHandler);
816816
}
@@ -820,7 +820,7 @@ private CommonErrorHandler determineCommonErrorHandler(@Nullable GenericErrorHan
820820
}
821821
else {
822822
validateErrorHandler(false);
823-
ErrorHandler eh = determineErrorHandler(errHandler);
823+
ErrorHandler eh = (ErrorHandler) errHandler;
824824
if (eh != null) {
825825
return new ErrorHandlerAdapter(eh);
826826
}
@@ -1120,18 +1120,6 @@ protected void checkConsumer() {
11201120
}
11211121
}
11221122

1123-
@Nullable
1124-
protected BatchErrorHandler determineBatchErrorHandler(@Nullable GenericErrorHandler<?> errHandler) {
1125-
return errHandler != null ? (BatchErrorHandler) errHandler
1126-
: this.transactionManager != null ? null : new RecoveringBatchErrorHandler();
1127-
}
1128-
1129-
@Nullable
1130-
protected ErrorHandler determineErrorHandler(@Nullable GenericErrorHandler<?> errHandler) {
1131-
return errHandler != null ? (ErrorHandler) errHandler
1132-
: this.transactionManager != null ? null : new SeekToCurrentErrorHandler();
1133-
}
1134-
11351123
@Nullable
11361124
private MicrometerHolder obtainMicrometerHolder() {
11371125
MicrometerHolder holder = null;

spring-kafka/src/main/java/org/springframework/kafka/listener/RecoveringBatchErrorHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,14 @@
3535
* exception, error handling is delegated to a {@link SeekToCurrentBatchErrorHandler} with
3636
* this handler's {@link BackOff}. If the record is recovered, its offset is committed.
3737
*
38+
* @deprecated in favor of {@link DefaultErrorHandler}.
39+
*
3840
* @author Gary Russell
3941
* @author Myeonghyeon Lee
4042
* @since 2.5
4143
*
4244
*/
45+
@Deprecated
4346
public class RecoveringBatchErrorHandler extends FailedBatchProcessor
4447
implements ContainerAwareBatchErrorHandler {
4548

spring-kafka/src/main/java/org/springframework/kafka/listener/RetryingBatchErrorHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class RetryingBatchErrorHandler extends KafkaExceptionLogLevelAware
4848

4949
private final BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer;
5050

51+
@SuppressWarnings("deprecation")
5152
private final CommonErrorHandler seeker = new ErrorHandlerAdapter(new SeekToCurrentBatchErrorHandler());
5253

5354
private boolean ackAfterHandle = true;

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,14 @@
3131
* An error handler that seeks to the current offset for each topic in a batch of records.
3232
* Used to rewind partitions after a message failure so that the batch can be replayed.
3333
*
34+
* @deprecated with no replacement - use {@link DefaultErrorHandler} with an infinite
35+
* {@link BackOff}.
36+
*
3437
* @author Gary Russell
3538
* @since 2.1
3639
*
3740
*/
41+
@Deprecated
3842
public class SeekToCurrentBatchErrorHandler extends KafkaExceptionLogLevelAware
3943
implements ContainerAwareBatchErrorHandler {
4044

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,15 @@
3030
* records. Used to rewind partitions after a message failure so that it can be
3131
* replayed.
3232
*
33+
* @deprecated in favor of {@link DefaultErrorHandler}.
34+
*
3335
* @author Gary Russell
3436
* @author Artem Bilan
3537
*
3638
* @since 2.0.1
3739
*
3840
*/
41+
@Deprecated
3942
public class SeekToCurrentErrorHandler extends FailedRecordProcessor implements ContainerAwareErrorHandler {
4043

4144
private boolean ackAfterHandle = true;

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,20 @@
3030
import org.springframework.core.log.LogAccessor;
3131
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
3232
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
33+
import org.springframework.kafka.listener.CommonErrorHandler;
3334
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
3435
import org.springframework.kafka.listener.ContainerProperties;
3536
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
36-
import org.springframework.kafka.listener.ErrorHandler;
37+
import org.springframework.kafka.listener.DefaultErrorHandler;
3738
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
38-
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
3939
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
4040
import org.springframework.util.Assert;
4141
import org.springframework.util.backoff.FixedBackOff;
4242

4343
/**
4444
*
4545
* Configures the provided {@link ConcurrentKafkaListenerContainerFactory} with a
46-
* {@link SeekToCurrentErrorHandler}, the {@link DeadLetterPublishingRecoverer} created by
46+
* {@link DefaultErrorHandler}, the {@link DeadLetterPublishingRecoverer} created by
4747
* the {@link DeadLetterPublishingRecovererFactory}.
4848
*
4949
* Mind that the same factory can be used by many different
@@ -76,7 +76,7 @@ public class ListenerContainerFactoryConfigurer {
7676
private Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer = container -> {
7777
};
7878

79-
private Consumer<ErrorHandler> errorHandlerCustomizer = errorHandler -> {
79+
private Consumer<CommonErrorHandler> errorHandlerCustomizer = errorHandler -> {
8080
};
8181

8282
private final DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory;
@@ -108,12 +108,13 @@ public class ListenerContainerFactoryConfigurer {
108108
: doConfigure(containerFactory, Collections.emptyList());
109109
}
110110

111-
private ConcurrentKafkaListenerContainerFactory<?, ?> doConfigure(ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory,
112-
List<Long> backOffValues) {
113-
containerFactory.setContainerCustomizer(container ->
114-
setupBackoffAwareMessageListenerAdapter(container, backOffValues));
111+
private ConcurrentKafkaListenerContainerFactory<?, ?> doConfigure(
112+
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, List<Long> backOffValues) {
113+
114+
containerFactory
115+
.setContainerCustomizer(container -> setupBackoffAwareMessageListenerAdapter(container, backOffValues));
115116
containerFactory
116-
.setErrorHandler(createErrorHandler(this.deadLetterPublishingRecovererFactory.create()));
117+
.setCommonErrorHandler(createErrorHandler(this.deadLetterPublishingRecovererFactory.create()));
117118
return containerFactory;
118119
}
119120

@@ -135,12 +136,12 @@ public void setContainerCustomizer(Consumer<ConcurrentMessageListenerContainer<?
135136
this.containerCustomizer = containerCustomizer;
136137
}
137138

138-
public void setErrorHandlerCustomizer(Consumer<ErrorHandler> errorHandlerCustomizer) {
139+
public void setErrorHandlerCustomizer(Consumer<CommonErrorHandler> errorHandlerCustomizer) {
139140
this.errorHandlerCustomizer = errorHandlerCustomizer;
140141
}
141142

142-
private ErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
143-
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer,
143+
private CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
144+
DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer,
144145
new FixedBackOff(0, 0));
145146
errorHandler.setCommitRecovered(true);
146147
this.errorHandlerCustomizer.accept(errorHandler);

spring-kafka/src/test/java/org/springframework/kafka/annotation/StatefulRetryTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,8 +37,8 @@
3737
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
3838
import org.springframework.kafka.core.KafkaTemplate;
3939
import org.springframework.kafka.core.ProducerFactory;
40+
import org.springframework.kafka.listener.DefaultErrorHandler;
4041
import org.springframework.kafka.listener.MessageListenerContainer;
41-
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
4242
import org.springframework.kafka.test.EmbeddedKafkaBroker;
4343
import org.springframework.kafka.test.context.EmbeddedKafka;
4444
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -88,18 +88,18 @@ public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(EmbeddedKa
8888
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
8989
new ConcurrentKafkaListenerContainerFactory<>();
9090
factory.setConsumerFactory(consumerFactory(embeddedKafka));
91-
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler() {
91+
DefaultErrorHandler errorHandler = new DefaultErrorHandler() {
9292

9393
@Override
94-
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
94+
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,
9595
Consumer<?, ?> consumer, MessageListenerContainer container) {
9696
Config.this.seekPerformed = true;
97-
super.handle(thrownException, records, consumer, container);
97+
super.handleRemaining(thrownException, records, consumer, container);
9898
}
9999

100100
};
101101
errorHandler.setLogLevel(Level.INFO);
102-
factory.setErrorHandler(errorHandler);
102+
factory.setCommonErrorHandler(errorHandler);
103103
factory.setStatefulRetry(true);
104104
factory.setRetryTemplate(new RetryTemplate());
105105
factory.setRecoveryCallback(c -> {

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -917,8 +917,8 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
917917
containerProps.setClientId("clientId");
918918
KafkaMessageListenerContainer<Integer, String> container =
919919
new KafkaMessageListenerContainer<>(cf, containerProps);
920-
SeekToCurrentErrorHandler errorHandler = spy(new SeekToCurrentErrorHandler(new FixedBackOff(0L, 0)));
921-
container.setErrorHandler(errorHandler);
920+
DefaultErrorHandler errorHandler = spy(new DefaultErrorHandler(new FixedBackOff(0L, 0)));
921+
container.setCommonErrorHandler(errorHandler);
922922
container.start();
923923
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
924924
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
@@ -927,7 +927,7 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
927927
inOrder.verify(messageListener).onMessage(any(ConsumerRecord.class));
928928
inOrder.verify(consumer).commitSync(anyMap(), any());
929929
inOrder.verify(messageListener).onMessage(any(ConsumerRecord.class));
930-
inOrder.verify(errorHandler).handle(any(), any(), any(), any());
930+
inOrder.verify(errorHandler).handleRemaining(any(), any(), any(), any());
931931
inOrder.verify(consumer).commitSync(anyMap(), any());
932932
container.stop();
933933
}
@@ -2879,7 +2879,6 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
28792879
KafkaMessageListenerContainer<Integer, String> container =
28802880
new KafkaMessageListenerContainer<>(cf, containerProps);
28812881
container.setBeanName("testContainerException");
2882-
container.setErrorHandler(new SeekToCurrentErrorHandler());
28832882
container.start();
28842883
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
28852884
template.sendDefault(0, 0, "a");
@@ -3438,7 +3437,7 @@ void commitAfterHandleManual() throws InterruptedException {
34383437
new KafkaMessageListenerContainer<>(cf, containerProps);
34393438
AtomicBoolean recovered = new AtomicBoolean();
34403439
CountDownLatch latch = new CountDownLatch(1);
3441-
container.setErrorHandler(new SeekToCurrentErrorHandler((rec, ex) -> {
3440+
container.setCommonErrorHandler(new DefaultErrorHandler((rec, ex) -> {
34423441
recovered.set(true);
34433442
latch.countDown();
34443443
},

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualAssignmentInitialSeekTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2020 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -200,7 +200,6 @@ public Consumer consumer() {
200200
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
201201
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
202202
factory.setConsumerFactory(consumerFactory());
203-
factory.setErrorHandler(new SeekToCurrentErrorHandler());
204203
factory.getContainerProperties().setAckMode(AckMode.RECORD);
205204
factory.getContainerProperties().setDeliveryAttemptHeader(true);
206205
return factory;

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -209,7 +209,6 @@ public Consumer consumer() {
209209
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
210210
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
211211
factory.setConsumerFactory(consumerFactory());
212-
factory.setErrorHandler(new SeekToCurrentErrorHandler());
213212
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
214213
factory.getContainerProperties().setMissingTopicsFatal(false);
215214
return factory;

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTxTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -74,6 +74,7 @@
7474
*/
7575
@SpringJUnitConfig
7676
@DirtiesContext
77+
@SuppressWarnings("deprecation")
7778
public class ManualNackRecordTxTests {
7879

7980
@SuppressWarnings("rawtypes")

spring-kafka/src/test/java/org/springframework/kafka/listener/RecoveringBatchErrorHandlerIntegrationTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
* @since 2.5
4949
*
5050
*/
51+
@SuppressWarnings("deprecation")
5152
@EmbeddedKafka(topics = {
5253
RecoveringBatchErrorHandlerIntegrationTests.topic1,
5354
RecoveringBatchErrorHandlerIntegrationTests.topic1DLT,

spring-kafka/src/test/java/org/springframework/kafka/listener/RecoveringBatchErrorHandlerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
*/
7171
@SpringJUnitConfig
7272
@DirtiesContext
73+
@SuppressWarnings("deprecation")
7374
public class RecoveringBatchErrorHandlerTests {
7475

7576
private static final String CONTAINER_ID = "container";

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandlerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2020 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -79,6 +79,7 @@
7979
*/
8080
@SpringJUnitConfig
8181
@DirtiesContext
82+
@SuppressWarnings("deprecation")
8283
public class SeekToCurrentBatchErrorHandlerTests {
8384

8485
private static final String CONTAINER_ID = "container";

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
* @since 2.3
4949
*
5050
*/
51+
@SuppressWarnings("deprecation")
5152
public class SeekToCurrentErrorHandlerTests {
5253

5354
@Test

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTXTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2020 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -72,6 +72,7 @@
7272
*/
7373
@SpringJUnitConfig
7474
@DirtiesContext
75+
@SuppressWarnings("deprecation")
7576
public class SeekToCurrentOnErrorBatchModeTXTests {
7677

7778
private static final String CONTAINER_ID = "container";
@@ -236,7 +237,7 @@ public Consumer consumer() {
236237
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
237238
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
238239
factory.setConsumerFactory(consumerFactory());
239-
factory.setErrorHandler(new SeekToCurrentErrorHandler());
240+
factory.setCommonErrorHandler(new DefaultErrorHandler());
240241
factory.getContainerProperties().setAckMode(AckMode.BATCH);
241242
factory.getContainerProperties().setTransactionManager(tm());
242243
return factory;

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2020 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -206,7 +206,6 @@ public Consumer consumer() {
206206
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
207207
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
208208
factory.setConsumerFactory(consumerFactory());
209-
factory.setErrorHandler(new SeekToCurrentErrorHandler());
210209
factory.getContainerProperties().setAckMode(AckMode.BATCH);
211210
return factory;
212211
}

0 commit comments

Comments
 (0)