Skip to content

Commit 7af2a1c

Browse files
authored
GH-1886: Deprecate RetryTemplate
Resolves #1886 * Missed a deprecation warning. * Use FQCN.
1 parent c43a265 commit 7af2a1c

File tree

7 files changed

+18
-99
lines changed

7 files changed

+18
-99
lines changed

Diff for: spring-kafka-docs/src/main/asciidoc/kafka.adoc

+1-93
Original file line numberDiff line numberDiff line change
@@ -2310,99 +2310,7 @@ IMPORTANT: The `FilteringBatchMessageListenerAdapter` is ignored if your `@Kafka
23102310
[[retrying-deliveries]]
23112311
===== Retrying Deliveries
23122312

2313-
If your listener throws an exception, the default behavior is to invoke the <<error-handlers>>, if configured, or logged otherwise.
2314-
2315-
NOTE:
2316-
To retry deliveries, a convenient listener adapter `RetryingMessageListenerAdapter` is provided.
2317-
2318-
You can configure it with a `RetryTemplate` and `RecoveryCallback<Void>` - see the https://github.com/spring-projects/spring-retry[spring-retry] project for information about these components.
2319-
If a recovery callback is not provided, the exception is thrown to the container after retries are exhausted.
2320-
In that case, the `ErrorHandler` is invoked, if configured, or logged otherwise.
2321-
2322-
When you use `@KafkaListener`, you can set the `RetryTemplate` (and optionally `recoveryCallback`) on the container factory.
2323-
When you do so, the listener is wrapped in the appropriate retrying adapter.
2324-
2325-
The contents of the `RetryContext` passed into the `RecoveryCallback` depend on the type of listener.
2326-
The context always has a `record` attribute, which is the record for which the failure occurred.
2327-
If your listener is acknowledging or consumer aware, additional `acknowledgment` or `consumer` attributes are available.
2328-
For convenience, the `RetryingMessageListenerAdapter` provides static constants for these keys.
2329-
See its https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/adapter/AbstractRetryingMessageListenerAdapter.html[Javadoc] for more information.
2330-
2331-
A retry adapter is not provided for any of the batch <<message-listeners,message listeners>>, because the framework has no knowledge of where in a batch the failure occurred.
2332-
If you need retry capabilities when you use a batch listener, we recommend that you use a `RetryTemplate` within the listener itself.
2333-
2334-
[[stateful-retry]]
2335-
===== Stateful Retry
2336-
2337-
IMPORTANT: Now that the `SeekToCurrentErrorHandler` can be configured with a `BackOff` and has the ability to retry only certain exceptions (since version 2.3), the use of stateful retry, via the listener adapter retry configuration, is no longer necessary.
2338-
You can provide the same functionality with appropriate configuration of the error handler and remove all retry configuration from the listener adapter.
2339-
See <<seek-to-current>> for more information.
2340-
2341-
You should understand that the retry discussed in the <<retrying-deliveries,preceding section>> suspends the consumer thread (if a `BackOffPolicy` is used).
2342-
There are no calls to `Consumer.poll()` during the retries.
2343-
Kafka has two properties to determine consumer health.
2344-
The `session.timeout.ms` is used to determine if the consumer is active.
2345-
Since `kafka-clients` version `0.10.1.0`, heartbeats are sent on a background thread, so a slow consumer no longer affects that.
2346-
`max.poll.interval.ms` (default: five minutes) is used to determine if a consumer appears to be hung (taking too long to process records from the last poll).
2347-
If the time between `poll()` calls exceeds this, the broker revokes the assigned partitions and performs a rebalance.
2348-
For lengthy retry sequences, with back off, this can easily happen.
2349-
2350-
Since version 2.1.3, you can avoid this problem by using stateful retry in conjunction with a `SeekToCurrentErrorHandler`.
2351-
In this case, each delivery attempt throws the exception back to the container, the error handler re-seeks the unprocessed offsets, and the same message is redelivered by the next `poll()`.
2352-
This avoids the problem of exceeding the `max.poll.interval.ms` property (as long as an individual delay between attempts does not exceed it).
2353-
So, when you use an `ExponentialBackOffPolicy`, you must ensure that the `maxInterval` is less than the `max.poll.interval.ms` property.
2354-
To enable stateful retry, you can use the `RetryingMessageListenerAdapter` constructor that takes a `stateful` `boolean` argument (set it to `true`).
2355-
When you configure the listener container factory (for `@KafkaListener`), set the factory's `statefulRetry` property to `true`.
2356-
2357-
IMPORTANT: Version 2.2 added recovery to the `SeekToCurrentErrorHandler`, such as sending a failed record to a dead-letter topic.
2358-
When using stateful retry, you must perform the recovery in the retry `RecoveryCallback` and NOT in the error handler.
2359-
Otherwise, if the recovery is done in the error handler, the retry template's state will never be cleared.
2360-
Also, you must ensure that the `maxFailures` in the `SeekToCurrentErrorHandler` must be at least as many as configured in the retry policy, again to ensure that the retries are exhausted and the state cleared.
2361-
Here is an example for retry configuration when used with a `SeekToCurrentErrorHandler` where `factory` is the `ConcurrentKafkaListenerContainerFactory`.
2362-
2363-
====
2364-
[source, java]
2365-
----
2366-
@Autowired
2367-
DeadLetterPublishingRecoverer recoverer;
2368-
2369-
...
2370-
factory.setRetryTemplate(new RetryTemplate()); // 3 retries by default
2371-
factory.setStatefulRetry(true);
2372-
factory.setRecoveryCallback(context -> {
2373-
recoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"),
2374-
(Exception) context.getLastThrowable());
2375-
return null;
2376-
});
2377-
...
2378-
2379-
@Bean
2380-
public SeekToCurrentErrorHandler eh() {
2381-
return new SeekToCurrentErrorHandler(new FixedBackOff(0L, 3L)); // at least 3
2382-
}
2383-
----
2384-
====
2385-
2386-
However, see the note at the beginning of this section; you can avoid using the `RetryTemplate` altogether.
2387-
2388-
IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks.
2389-
Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
2390-
With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure.
2391-
To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` to `false`.
2392-
2393-
Starting with version 2.6, you can now provide the error handler with a `BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>` to determine the `BackOff` to use, based on the failed record and/or the exception:
2394-
2395-
====
2396-
[source, java]
2397-
----
2398-
handler.setBackOffFunction((record, ex) -> { ... });
2399-
----
2400-
====
2401-
2402-
If the function returns `null`, the handler's default `BackOff` will be used.
2403-
2404-
Starting with version 2.6.3, set `resetStateOnExceptionChange` to `true` and the retry sequence will be restarted (including the selection of a new `BackOff`, if so configured) if the exception type changes between failures.
2405-
By default, the exception type is not considered.
2313+
See the `SeekToCurrentErrorHandler` in <<annotation-error-handling>>.
24062314

24072315
[[sequencing]]
24082316
===== Starting `@KafkaListener` s in Sequence

Diff for: spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

+3
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,9 @@ public void setAckDiscarded(Boolean ackDiscarded) {
180180
/**
181181
* Set a retryTemplate.
182182
* @param retryTemplate the template.
183+
* @deprecated since 2.8 - use a suitably configured error handler instead.
183184
*/
185+
@Deprecated
184186
public void setRetryTemplate(RetryTemplate retryTemplate) {
185187
this.retryTemplate = retryTemplate;
186188
}
@@ -385,6 +387,7 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
385387
return instance;
386388
}
387389

390+
@SuppressWarnings("deprecation")
388391
private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> aklEndpoint) {
389392
JavaUtils.INSTANCE
390393
.acceptIfNotNull(this.recordFilterStrategy, aklEndpoint::setRecordFilterStrategy)

Diff for: spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2020 the original author or authors.
2+
* Copyright 2014-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -45,7 +45,6 @@
4545
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
4646
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
4747
import org.springframework.kafka.listener.adapter.ReplyHeadersConfigurer;
48-
import org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter;
4948
import org.springframework.kafka.support.TopicPartitionOffset;
5049
import org.springframework.kafka.support.converter.MessageConverter;
5150
import org.springframework.lang.Nullable;
@@ -324,7 +323,9 @@ protected RetryTemplate getRetryTemplate() {
324323
/**
325324
* Set a retryTemplate.
326325
* @param retryTemplate the template.
326+
* @deprecated since 2.8 - use a suitably configured error handler instead.
327327
*/
328+
@Deprecated
328329
public void setRetryTemplate(RetryTemplate retryTemplate) {
329330
this.retryTemplate = retryTemplate;
330331
}
@@ -498,7 +499,7 @@ public void setupListenerContainer(MessageListenerContainer listenerContainer,
498499
protected abstract MessagingMessageListenerAdapter<K, V> createMessageListener(MessageListenerContainer container,
499500
@Nullable MessageConverter messageConverter);
500501

501-
@SuppressWarnings("unchecked")
502+
@SuppressWarnings({ "unchecked", "deprecation" })
502503
private void setupMessageListener(MessageListenerContainer container,
503504
@Nullable MessageConverter messageConverter) {
504505

@@ -514,7 +515,8 @@ private void setupMessageListener(MessageListenerContainer container,
514515
"A 'RetryTemplate' is not supported with a batch listener; consider configuring the container "
515516
+ "with a suitably configured 'SeekToCurrentBatchErrorHandler' instead");
516517
if (this.retryTemplate != null) {
517-
messageListener = new RetryingMessageListenerAdapter<>((MessageListener<K, V>) messageListener,
518+
messageListener = new org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter<>(
519+
(MessageListener<K, V>) messageListener,
518520
this.retryTemplate, this.recoveryCallback, this.statefulRetry);
519521
}
520522
if (this.recordFilterStrategy != null) {

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RetryingMessageListenerAdapter.java

+3
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,12 @@
3535
* @param <K> the key type.
3636
* @param <V> the value type.
3737
*
38+
* @deprecated since 2.8 - use a suitably configured error handler instead.
39+
*
3840
* @author Gary Russell
3941
*
4042
*/
43+
@Deprecated
4144
public class RetryingMessageListenerAdapter<K, V>
4245
extends AbstractRetryingMessageListenerAdapter<K, V, MessageListener<K, V>>
4346
implements AcknowledgingConsumerAwareMessageListener<K, V> {

Diff for: spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,6 @@
111111
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
112112
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
113113
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
114-
import org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter;
115114
import org.springframework.kafka.support.Acknowledgment;
116115
import org.springframework.kafka.support.KafkaHeaders;
117116
import org.springframework.kafka.support.KafkaNull;
@@ -255,6 +254,7 @@ public void testAnonymous() {
255254
container.stop();
256255
}
257256

257+
@SuppressWarnings("deprecation")
258258
@Test
259259
public void testSimple() throws Exception {
260260
this.recordFilter.called = false;
@@ -303,7 +303,7 @@ public void testSimple() throws Exception {
303303
assertThat(KafkaTestUtils.getPropertyValue(manualContainer, "containerProperties.messageListener.ackDiscarded",
304304
Boolean.class)).isTrue();
305305
assertThat(KafkaTestUtils.getPropertyValue(manualContainer, "containerProperties.messageListener.delegate"))
306-
.isInstanceOf(RetryingMessageListenerAdapter.class);
306+
.isInstanceOf(org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.class);
307307
assertThat(KafkaTestUtils
308308
.getPropertyValue(manualContainer, "containerProperties.messageListener.delegate.recoveryCallback")
309309
.getClass().getName()).contains("EnableKafkaIntegrationTests$Config$");
@@ -1197,6 +1197,7 @@ public KafkaListenerContainerFactory<?> batchManualFactory2() {
11971197
}
11981198

11991199
@Bean
1200+
@SuppressWarnings("deprecation")
12001201
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
12011202
kafkaManualAckListenerContainerFactory() {
12021203

Diff for: spring-kafka/src/test/java/org/springframework/kafka/annotation/StatefulRetryTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public static class Config {
8282

8383
private boolean seekPerformed;
8484

85+
@SuppressWarnings("deprecation")
8586
@Bean
8687
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(EmbeddedKafkaBroker embeddedKafka) {
8788
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/RetryingMessageListenerAdapterTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
* @since 2.0
3838
*
3939
*/
40+
@SuppressWarnings("deprecation")
4041
public class RetryingMessageListenerAdapterTests {
4142

4243
@Test

0 commit comments

Comments
 (0)