Skip to content

Commit a7af914

Browse files
jgslimamatera-jgui
andauthored
GH-2496: Reuse retry topic for maxInterval delay (#2497)
* GH-2496: Reuse retry topic for maxInterval delay Resolves #2496 Created SameIntervalTopicReuseStrategy, used to keep retrying in the same retry topic when the delays of the last retries are the same. * GH-2496 Reference documentation typos. * GH-2496: Deprecation of FixedDelayStrategy. RetryTopic docs and code fixes and small refactorings. * GH-2496: Restoring DestinationTopic.isSingleTopicRetry() * GH-2496: Restoring DestinationTopic.Type.SINGLE_TOPIC_RETRY. * GH-2496: Removal of "forRemoval" for FixedDelayStrategy * GH-2496: Supressing deprecation warnings * GH-2496 Ajusting new method isRetryTopic(). * GH-2496: Documentation and formatting --------- Co-authored-by: João Guilherme de Souza Lima <[email protected]>
1 parent 1278a9c commit a7af914

15 files changed

+675
-107
lines changed

Diff for: spring-kafka-docs/src/main/asciidoc/index.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ ifdef::backend-pdf[]
1717
NOTE: This documentation is also available as https://docs.spring.io/spring-kafka/docs/{project-version}/reference/html/index.html[HTML].
1818
endif::[]
1919

20-
(C) 2016 - 2022 VMware, Inc.
20+
(C) 2016 - 2023 VMware, Inc.
2121

2222
Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically.
2323

Diff for: spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

+114-37
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPo
146146
----
147147
====
148148

149-
NOTE: The retry topics' and dlt's consumers will be assigned to a consumer group with a group id that is the combination of the one with you provide in the `groupId` parameter of the `@KafkaListener` annotation with the topic's suffix. If you don't provide any they'll all belong to the same group, and rebalance on a retry topic will cause an unnecessary rebalance on the main topic.
149+
NOTE: The retry topics' and dlt's consumers will be assigned to a consumer group with a group id that is the combination of the one with you provide in the `groupId` parameter of the `@KafkaListener` annotation with the topic's suffix.
150+
If you don't provide any they'll all belong to the same group, and rebalance on a retry topic will cause an unnecessary rebalance on the main topic.
150151

151152
IMPORTANT: If the consumer is configured with an <<error-handling-deserializer,`ErrorHandlingDeserializer`>>, to handle deserilialization exceptions, it is important to configure the `KafkaTemplate` and its producer with a serializer that can handle normal objects as well as raw `byte[]` values, which result from deserialization exceptions.
152153
The generic value type of the template should be `Object`.
@@ -401,39 +402,6 @@ If your back off policy requires delays with values bigger than that, adjust the
401402

402403
IMPORTANT: The first attempt counts against `maxAttempts`, so if you provide a `maxAttempts` value of 4 there'll be the original attempt plus 3 retries.
403404

404-
===== Single Topic Fixed Delay Retries
405-
406-
If you're using fixed delay policies such as `FixedBackOffPolicy` or `NoBackOffPolicy` you can use a single topic to accomplish the non-blocking retries.
407-
This topic will be suffixed with the provided or default suffix, and will not have either the index or the delay values appended.
408-
409-
====
410-
[source, java]
411-
----
412-
@RetryableTopic(backoff = @Backoff(2000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
413-
@KafkaListener(topics = "my-annotated-topic")
414-
public void processMessage(MyPojo message) {
415-
// ... message processing
416-
}
417-
----
418-
====
419-
420-
====
421-
[source, java]
422-
----
423-
@Bean
424-
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
425-
return RetryTopicConfigurationBuilder
426-
.newInstance()
427-
.fixedBackoff(3000)
428-
.maxAttempts(5)
429-
.useSingleTopicForFixedDelays()
430-
.create(template);
431-
}
432-
----
433-
====
434-
435-
NOTE: The default behavior is creating separate retry topics for each attempt, appended with their index value: retry-0, retry-1, ...
436-
437405
===== Global timeout
438406

439407
You can set the global timeout for the retrying process.
@@ -693,6 +661,12 @@ Examples:
693661

694662
"my-other-topic" -> "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", ..., "my-topic-myDltSuffix".
695663

664+
NOTE: The default behavior is to create separate retry topics for each attempt, appended with an index value: retry-0, retry-1, ..., retry-n.
665+
Therefore, by default the number of retry topics is the configured `maxAttempts` minus 1.
666+
667+
You can <<retry-topics-and-dlt-suffixes,configure the suffixes>>, choose whether to append <<append-index-or-delay,the attempt index or delay>>, use a <<single-topic-fixed-delay,single retry topic when using fixed backoff>>, and use a <<single-topic-maxinterval-delay,single retry topic for the attempts with the maxInterval>> when using exponential backoffs.
668+
669+
[[retry-topics-and-dlt-suffixes]]
696670
===== Retry Topics and Dlt Suffixes
697671

698672
You can specify the suffixes that will be used by the retry and dlt topics.
@@ -724,6 +698,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> t
724698

725699
NOTE: The default suffixes are "-retry" and "-dlt", for retry topics and dlt respectively.
726700

701+
[[append-index-or-delay]]
727702
===== Appending the Topic's Index or Delay
728703

729704
You can either append the topic's index or delay values after the suffix.
@@ -754,9 +729,108 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> templa
754729

755730
NOTE: The default behavior is to suffix with the delay values, except for fixed delay configurations with multiple topics, in which case the topics are suffixed with the topic's index.
756731

732+
[[single-topic-fixed-delay]]
733+
===== Single Topic for Fixed Delay Retries
734+
735+
If you're using fixed delay policies such as `FixedBackOffPolicy` or `NoBackOffPolicy` you can use a single topic to accomplish the non-blocking retries.
736+
This topic will be suffixed with the provided or default suffix, and will not have either the index or the delay values appended.
737+
738+
NOTE: `FixedDelayStrategy` is now deprecated, and will be replaced by `SameIntervalTopicReuseStrategy` in a future release.
739+
740+
====
741+
[source, java]
742+
----
743+
@RetryableTopic(backoff = @Backoff(2000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
744+
@KafkaListener(topics = "my-annotated-topic")
745+
public void processMessage(MyPojo message) {
746+
// ... message processing
747+
}
748+
----
749+
====
750+
751+
====
752+
[source, java]
753+
----
754+
@Bean
755+
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
756+
return RetryTopicConfigurationBuilder
757+
.newInstance()
758+
.fixedBackoff(3000)
759+
.maxAttempts(5)
760+
.useSingleTopicForFixedDelays()
761+
.create(template);
762+
}
763+
----
764+
====
765+
766+
NOTE: The default behavior is creating separate retry topics for each attempt, appended with their index value: retry-0, retry-1, ...
767+
768+
769+
[[single-topic-maxinterval-delay]]
770+
===== Single Topic for maxInterval Exponential Delay
771+
772+
If you're using exponential backoff policy (`ExponentialBackOffPolicy`), you can use a single retry topic to accomplish the non-blocking retries of the attempts whose delays are the configured `maxInterval`.
773+
774+
This "final" retry topic will be suffixed with the provided or default suffix, and will have either the index or the `maxInterval` value appended.
775+
776+
NOTE: By opting to use a single topic for the retries with the `maxInterval` delay, it may become more viable to configure an exponential retry policy that keeps retrying for a long time, because in this approach you do not need a large amount of topics.
777+
778+
The default behavior is to work with the number of retry topics equal to the configured `maxAttempts` minus 1 and, when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topic (corresponding to the `maxInterval` delay) being suffixed with an additional index.
779+
780+
For instance, when configuring the exponential backoff with `initialInterval=1000`, `multiplier=2`, and `maxInterval=16000`, in order to keep trying for one hour, one would need to configure `maxAttempts` as 229, and by default the needed retry topics would be:
781+
782+
* -retry-1000
783+
* -retry-2000
784+
* -retry-4000
785+
* -retry-8000
786+
* -retry-16000-0
787+
* -retry-16000-1
788+
* -retry-16000-2
789+
* ...
790+
* -retry-16000-224
791+
792+
When using the strategy that reuses the retry topic for the same intervals, in the same configuration above the needed retry topics would be:
793+
794+
* -retry-1000
795+
* -retry-2000
796+
* -retry-4000
797+
* -retry-8000
798+
* -retry-16000
799+
800+
This will be the default in a future release.
801+
802+
====
803+
[source, java]
804+
----
805+
@RetryableTopic(attempts = 230,
806+
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 16000),
807+
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
808+
@KafkaListener(topics = "my-annotated-topic")
809+
public void processMessage(MyPojo message) {
810+
// ... message processing
811+
}
812+
----
813+
====
814+
815+
====
816+
[source, java]
817+
----
818+
@Bean
819+
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
820+
return RetryTopicConfigurationBuilder
821+
.newInstance()
822+
.exponentialBackoff(1000, 2, 16000)
823+
.maxAttempts(230)
824+
.useSingleTopicForSameIntervals()
825+
.create(template);
826+
}
827+
----
828+
====
829+
757830
===== Custom naming strategies
758831

759-
More complex naming strategies can be accomplished by registering a bean that implements `RetryTopicNamesProviderFactory`. The default implementation is `SuffixingRetryTopicNamesProviderFactory` and a different implementation can be registered in the following way:
832+
More complex naming strategies can be accomplished by registering a bean that implements `RetryTopicNamesProviderFactory`.
833+
The default implementation is `SuffixingRetryTopicNamesProviderFactory` and a different implementation can be registered in the following way:
760834

761835
====
762836
[source, java]
@@ -836,7 +910,9 @@ The framework will configure and use a separate set of retry topics for each lis
836910

837911
==== Dlt Strategies
838912

839-
The framework provides a few strategies for working with DLTs. You can provide a method for DLT processing, use the default logging method, or have no DLT at all. Also you can choose what happens if DLT processing fails.
913+
The framework provides a few strategies for working with DLTs.
914+
You can provide a method for DLT processing, use the default logging method, or have no DLT at all.
915+
Also you can choose what happens if DLT processing fails.
840916

841917
===== Dlt Processing Method
842918

@@ -1045,7 +1121,8 @@ Use the `DestinationTopicResolver` interface if you need to weigh in these facto
10451121
[[change-kboe-logging-level]]
10461122
==== Changing KafkaBackOffException Logging Level
10471123

1048-
When a message in the retry topic is not due for consumption, a `KafkaBackOffException` is thrown. Such exceptions are logged by default at `DEBUG` level, but you can change this behavior by setting an error handler customizer in the `ListenerContainerFactoryConfigurer` in a `@Configuration` class.
1124+
When a message in the retry topic is not due for consumption, a `KafkaBackOffException` is thrown.
1125+
Such exceptions are logged by default at `DEBUG` level, but you can change this behavior by setting an error handler customizer in the `ListenerContainerFactoryConfigurer` in a `@Configuration` class.
10491126

10501127
For example, to change the logging level to WARN you might add:
10511128

Diff for: spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,8 +23,8 @@
2323
import java.lang.annotation.Target;
2424

2525
import org.springframework.kafka.retrytopic.DltStrategy;
26-
import org.springframework.kafka.retrytopic.FixedDelayStrategy;
2726
import org.springframework.kafka.retrytopic.RetryTopicConstants;
27+
import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy;
2828
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
2929
import org.springframework.retry.annotation.Backoff;
3030

@@ -38,6 +38,7 @@
3838
* @author Tomaz Fernandes
3939
* @author Gary Russell
4040
* @author Fabio da Silva Jr.
41+
* @author João Lima
4142
* @since 2.7
4243
*
4344
* @see org.springframework.kafka.retrytopic.RetryTopicConfigurer
@@ -177,6 +178,18 @@
177178
*/
178179
TopicSuffixingStrategy topicSuffixingStrategy() default TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE;
179180

181+
182+
/**
183+
* Topic reuse strategy for sequential attempts made with a same backoff interval.
184+
*
185+
* <p>Note: for fixed backoffs, when this is configured as
186+
* {@link SameIntervalTopicReuseStrategy#SINGLE_TOPIC}, it has precedence over
187+
* the configuration in {@link #fixedDelayTopicStrategy()}.
188+
* @return the strategy.
189+
* @since 3.0.4
190+
*/
191+
SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy() default SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS;
192+
180193
/**
181194
* Whether or not create a DLT, and redeliver to the DLT if delivery fails or just give up.
182195
* @return the dlt strategy.
@@ -186,8 +199,10 @@
186199
/**
187200
* Whether to use a single or multiple topics when using a fixed delay.
188201
* @return the fixed delay strategy.
202+
* @deprecated in favor of {@link #sameIntervalTopicReuseStrategy()}.
189203
*/
190-
FixedDelayStrategy fixedDelayTopicStrategy() default FixedDelayStrategy.MULTIPLE_TOPICS;
204+
@Deprecated
205+
org.springframework.kafka.retrytopic.FixedDelayStrategy fixedDelayTopicStrategy() default org.springframework.kafka.retrytopic.FixedDelayStrategy.MULTIPLE_TOPICS;
191206

192207
/**
193208
* Override the container factory's {@code autoStartup} property for just the DLT container.

Diff for: spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -101,6 +101,7 @@ public RetryableTopicAnnotationProcessor(BeanFactory beanFactory, BeanExpression
101101
this.expressionContext = expressionContext;
102102
}
103103

104+
@SuppressWarnings("deprecation")
104105
public RetryTopicConfiguration processAnnotation(String[] topics, Method method, RetryableTopic annotation,
105106
Object bean) {
106107

@@ -146,6 +147,7 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
146147
.dltProcessingFailureStrategy(annotation.dltStrategy())
147148
.autoStartDltHandler(autoStartDlt)
148149
.setTopicSuffixingStrategy(annotation.topicSuffixingStrategy())
150+
.sameIntervalTopicReuseStrategy(annotation.sameIntervalTopicReuseStrategy())
149151
.timeoutAfter(timeout)
150152
.create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics));
151153
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@
3535
import org.springframework.kafka.listener.ExceptionClassifier;
3636
import org.springframework.kafka.listener.ListenerExecutionFailedException;
3737
import org.springframework.kafka.listener.TimestampedException;
38+
import org.springframework.kafka.retrytopic.DestinationTopic.Type;
3839
import org.springframework.lang.Nullable;
3940
import org.springframework.util.Assert;
4041

@@ -132,8 +133,10 @@ && isNotFatalException(e)
132133
: destinationTopicHolder.getNextDestination();
133134
}
134135

136+
@SuppressWarnings("deprecation")
135137
private DestinationTopic resolveRetryDestination(DestinationTopicHolder destinationTopicHolder) {
136-
return destinationTopicHolder.getSourceDestination().isSingleTopicRetry()
138+
return ((destinationTopicHolder.getSourceDestination().isReusableRetryTopic()) ||
139+
(destinationTopicHolder.getSourceDestination().isSingleTopicRetry()))
137140
? destinationTopicHolder.getSourceDestination()
138141
: destinationTopicHolder.getNextDestination();
139142
}
@@ -192,13 +195,26 @@ public void addDestinationTopics(String mainListenerId, List<DestinationTopic> d
192195
throw new IllegalStateException("Cannot add new destinations, "
193196
+ DefaultDestinationTopicResolver.class.getSimpleName() + " is already refreshed.");
194197
}
198+
validateDestinations(destinationsToAdd);
195199
synchronized (this.sourceDestinationsHolderMap) {
196200
Map<String, DestinationTopicHolder> map = this.sourceDestinationsHolderMap.computeIfAbsent(mainListenerId,
197201
id -> new HashMap<>());
198202
map.putAll(correlatePairSourceAndDestinationValues(destinationsToAdd));
199203
}
200204
}
201205

206+
private void validateDestinations(List<DestinationTopic> destinationsToAdd) {
207+
for (int i = 0; i < destinationsToAdd.size(); i++) {
208+
DestinationTopic destination = destinationsToAdd.get(i);
209+
if (destination.isReusableRetryTopic()) {
210+
Assert.isTrue((i == (destinationsToAdd.size() - 1) ||
211+
((i == (destinationsToAdd.size() - 2)) && (destinationsToAdd.get(i + 1).isDltTopic()))),
212+
String.format("In the destination topic chain, the type %s can only be "
213+
+ "specified as the last retry topic.", Type.REUSABLE_RETRY_TOPIC));
214+
}
215+
}
216+
}
217+
202218
private Map<String, DestinationTopicHolder> correlatePairSourceAndDestinationValues(
203219
List<DestinationTopic> destinationList) {
204220
return IntStream

0 commit comments

Comments
 (0)