Skip to content

Commit fec109b

Browse files
committed
Set RetryTopicConfigurationBuilder default to reuse same topic for identical intervals
1 parent 637a0d0 commit fec109b

10 files changed

+66
-40
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/how-the-pattern-works.adoc

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ The retry topic consumer then checks the timestamp and if it's not due it pauses
66
When it is due the partition consumption is resumed, and the message is consumed again.
77
If the message processing fails again the message will be forwarded to the next retry topic, and the pattern is repeated until a successful processing occurs, or the attempts are exhausted, and the message is sent to the Dead Letter Topic (if configured).
88

9+
NOTE: Starting with version 3.2, the default behavior is to reuse the retry topic for the same intervals.
10+
911
To illustrate, if you have a "main-topic" topic, and want to set up non-blocking retry with an exponential backoff of 1000ms with a multiplier of 2 and 4 max attempts, it will create the main-topic-retry-1000, main-topic-retry-2000, main-topic-retry-4000 and main-topic-dlt topics and configure the respective consumers.
1012
The framework also takes care of creating the topics and setting up and configuring the listeners.
1113

spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/topic-naming.adoc

+40-15
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ Examples:
99

1010
"my-other-topic" -> "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", ..., "my-topic-myDltSuffix"
1111

12-
NOTE: The default behavior is to create separate retry topics for each attempt, appended with an index value: retry-0, retry-1, ..., retry-n.
13-
Therefore, by default the number of retry topics is the configured `maxAttempts` minus 1.
12+
NOTE: Starting with version 3.2, the default behavior is to reuse the retry topic for the same intervals. Alternatively, you can configure separate retry topics for each attempt, appended with an index(or delay) value: retry-0, retry-1, ..., retry-n, by using `SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS`.
13+
If you choose this strategy, the number of retry topics will, by default, be `maxAttempts` minus 1.
1414

1515
You can xref:retrytopic/topic-naming.adoc#retry-topics-and-dlt-suffixes[configure the suffixes], choose whether to append xref:retrytopic/topic-naming.adoc#append-index-or-delay[the attempt index or delay], use a xref:retrytopic/topic-naming.adoc#single-topic-fixed-delay[single retry topic when using fixed backoff], and use a xref:retrytopic/topic-naming.adoc#single-topic-maxinterval-delay[single retry topic for the attempts with the maxInterval] when using exponential backoffs.
1616

@@ -69,8 +69,8 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> templa
6969

7070
NOTE: The default behavior is to suffix with the delay values, except for fixed delay configurations with multiple topics, in which case the topics are suffixed with the topic's index.
7171

72-
[[single-topic-fixed-delay]]
73-
== Single Topic for Fixed Delay Retries
72+
[[topic-strategies-for-fixed-delay]]
73+
== Topic Strategies for Fixed Delay Retries
7474

7575
If you're using fixed delay policies such as `FixedBackOffPolicy` or `NoBackOffPolicy` you can use a single topic to accomplish the non-blocking retries.
7676
This topic will be suffixed with the provided or default suffix, and will not have either the index or the delay values appended.
@@ -79,39 +79,64 @@ NOTE: The previous `FixedDelayStrategy` is now deprecated, and can be replaced b
7979

8080
[source, java]
8181
----
82-
@RetryableTopic(backoff = @Backoff(2_000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
82+
@RetryableTopic(
83+
backoff = @Backoff(2_000),
84+
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC // default
85+
)
8386
@KafkaListener(topics = "my-annotated-topic")
8487
public void processMessage(MyPojo message) {
8588
// ... message processing
8689
}
8790
----
8891

89-
[source, java]
92+
[source,java]
9093
----
9194
@Bean
9295
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
9396
return RetryTopicConfigurationBuilder
9497
.newInstance()
95-
.fixedBackOff(3_000)
96-
.maxAttempts(5)
97-
.useSingleTopicForFixedDelays()
98+
.fixedBackOff(2_000)
99+
.sameIntervalTopicReuseStrategy(SameIntervalTopicReuseStrategy.SINGLE_TOPIC) // default
98100
.create(template);
99101
}
100102
----
101103

102-
NOTE: The default behavior is creating separate retry topics for each attempt, appended with their index values: retry-0, retry-1, ...
104+
If multiple topics are required, then that can be done using the following configuration.
103105

106+
[source, java]
107+
----
108+
@RetryableTopic(
109+
backoff = @Backoff(3_000),
110+
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS
111+
)
112+
@KafkaListener(topics = "my-annotated-topic")
113+
public void processMessage(MyPojo message) {
114+
// ... message processing
115+
}
116+
----
104117

105-
[[single-topic-maxinterval-delay]]
106-
== Single Topic for maxInterval Exponential Delay
118+
[source,java]
119+
----
120+
@Bean
121+
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
122+
return RetryTopicConfigurationBuilder
123+
.newInstance()
124+
.fixedBackOff(3_000)
125+
.sameIntervalTopicReuseStrategy(SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS)
126+
.create(template);
127+
}
128+
----
129+
130+
[[topic-strategies-for-maxinterval-delay]]
131+
== Topic Strategies for maxInterval Exponential Delay
107132

108133
If you're using exponential backoff policy (`ExponentialBackOffPolicy`), you can use a single retry topic to accomplish the non-blocking retries of the attempts whose delays are the configured `maxInterval`.
109134

110135
This "final" retry topic will be suffixed with the provided or default suffix, and will have either the index or the `maxInterval` value appended.
111136

112137
NOTE: By opting to use a single topic for the retries with the `maxInterval` delay, it may become more viable to configure an exponential retry policy that keeps retrying for a long time, because in this approach you do not need a large amount of topics.
113138

114-
Starting 3.2, the default behavior is reuses the retry topic for the same intervals, when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topic reuses for the same intervals(corresponding to the `maxInterval` delay).
139+
Starting with version 3.2, the default behavior is to reuse the retry topic for the same intervals. When using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topic being reused for the same intervals(corresponding to the `maxInterval` delay).
115140

116141
For instance, when configuring the exponential backoff with `initialInterval=1_000`, `multiplier=2`, and `maxInterval=16_000`, in order to keep trying for one hour, one would need to configure `maxAttempts` as 229, and by default the needed retry topics would be:
117142

@@ -151,10 +176,10 @@ public void processMessage(MyPojo message) {
151176
@Bean
152177
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
153178
return RetryTopicConfigurationBuilder
154-
.newInstance()
179+
.newInstance()
155180
.exponentialBackoff(1_000, 2, 16_000)
156181
.maxAttempts(230)
157-
.useSingleTopicForSameIntervals()
182+
.sameIntervalTopicReuseStrategy(SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS)
158183
.create(template);
159184
}
160185
----

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2024 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -46,6 +46,7 @@
4646
* @author Gary Russell
4747
* @author Adrian Chlebosz
4848
* @author Wang Zhiyang
49+
* @author Borahm Lee
4950
*
5051
* @since 2.7
5152
*
@@ -88,7 +89,7 @@ public class RetryTopicConfigurationBuilder {
8889

8990
private TopicSuffixingStrategy topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE;
9091

91-
private SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS;
92+
private SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC;
9293

9394
@Nullable
9495
private Boolean autoStartDltHandler;
@@ -255,10 +256,6 @@ public RetryTopicConfigurationBuilder setTopicSuffixingStrategy(TopicSuffixingSt
255256
/**
256257
* Configure the {@link SameIntervalTopicReuseStrategy}.
257258
*
258-
* <p>Note: for fixed backoffs, when this is configured as
259-
* {@link SameIntervalTopicReuseStrategy#SINGLE_TOPIC}, it has precedence over
260-
* the configuration done through
261-
* {@link #useSingleTopicForSameIntervals()}.
262259
* @param sameIntervalTopicReuseStrategy the strategy.
263260
* @return the builder.
264261
* @since 3.0.4
@@ -279,6 +276,7 @@ public RetryTopicConfigurationBuilder sameIntervalTopicReuseStrategy(SameInterva
279276
* @since 3.0.4
280277
* @see SameIntervalTopicReuseStrategy
281278
*/
279+
@Deprecated(since = "3.4", forRemoval = true)
282280
public RetryTopicConfigurationBuilder useSingleTopicForSameIntervals() {
283281
this.sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC;
284282
return this;

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 the original author or authors.
2+
* Copyright 2024-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -86,6 +86,7 @@
8686

8787
/**
8888
* @author Sanghyeok An
89+
* @author Borahm Lee
8990
* @since 3.3.0
9091
*/
9192

@@ -738,7 +739,6 @@ RetryTopicConfiguration firstRetryTopic(KafkaTemplate<String, String> template)
738739
.fixedBackOff(50)
739740
.maxAttempts(5)
740741
.concurrency(1)
741-
.useSingleTopicForSameIntervals()
742742
.includeTopic(FIRST_TOPIC)
743743
.doNotRetryOnDltFailure()
744744
.dltHandlerMethod("myCustomDltProcessor", DLT_METHOD_NAME)

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicScenarioTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 the original author or authors.
2+
* Copyright 2024-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -66,6 +66,7 @@
6666
/**
6767
* @author Sanghyeok An
6868
* @author Artem Bilan
69+
* @author Borahm Lee
6970
*
7071
* @since 3.3.0
7172
*/
@@ -1113,7 +1114,6 @@ static RetryTopicConfiguration createRetryTopicConfiguration(
11131114
.fixedBackOff(50)
11141115
.maxAttempts(maxAttempts)
11151116
.concurrency(1)
1116-
.useSingleTopicForSameIntervals()
11171117
.includeTopic(topicName)
11181118
.doNotRetryOnDltFailure()
11191119
.dltHandlerMethod(dltBeanName, DLT_METHOD_NAME)

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoFutureRetryTopicClassLevelIntegrationTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 the original author or authors.
2+
* Copyright 2024-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -86,6 +86,7 @@
8686

8787
/**
8888
* @author Sanghyeok An
89+
* @author Borahm Lee
8990
* @since 3.3.0
9091
*/
9192

@@ -731,7 +732,6 @@ RetryTopicConfiguration firstRetryTopic(KafkaTemplate<String, String> template)
731732
.fixedBackOff(50)
732733
.maxAttempts(5)
733734
.concurrency(1)
734-
.useSingleTopicForSameIntervals()
735735
.includeTopic(FIRST_TOPIC)
736736
.doNotRetryOnDltFailure()
737737
.dltHandlerMethod("myCustomDltProcessor", DLT_METHOD_NAME)

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoRetryTopicScenarioTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 the original author or authors.
2+
* Copyright 2024-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -64,6 +64,7 @@
6464

6565
/**
6666
* @author Sanghyeok An
67+
* @author Borahm Lee
6768
* @since 3.3.0
6869
*/
6970

@@ -1108,7 +1109,6 @@ static RetryTopicConfiguration createRetryTopicConfiguration(
11081109
.fixedBackOff(50)
11091110
.maxAttempts(maxAttempts)
11101111
.concurrency(1)
1111-
.useSingleTopicForSameIntervals()
11121112
.includeTopic(topicName)
11131113
.doNotRetryOnDltFailure()
11141114
.dltHandlerMethod(dltBeanName, DLT_METHOD_NAME)

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicClassLevelIntegrationTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2024 the original author or authors.
2+
* Copyright 2021-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -95,6 +95,7 @@
9595
* @author Wang Zhiyang
9696
* @author Artem Bilan
9797
* @author Sanghyeok An
98+
* @author Borahm Lee
9899
*
99100
* @since 3.2
100101
*/
@@ -625,7 +626,6 @@ RetryTopicConfiguration firstRetryTopic(KafkaTemplate<String, String> template)
625626
.fixedBackOff(50)
626627
.maxAttempts(5)
627628
.concurrency(1)
628-
.useSingleTopicForSameIntervals()
629629
.includeTopic(FIRST_TOPIC)
630630
.doNotRetryOnDltFailure()
631631
.dltHandlerMethod("myCustomDltProcessor", DLT_METHOD_NAME)

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilderTests.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2024 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,6 +37,7 @@
3737
/**
3838
* @author Tomaz Fernandes
3939
* @author Adrian Chlebosz
40+
* @author Borahm Lee
4041
* @since 2.7
4142
*/
4243
@ExtendWith(MockitoExtension.class)
@@ -81,9 +82,10 @@ void shouldSetFixedBackOffPolicy() {
8182
// then
8283
List<DestinationTopic.Properties> destinationTopicProperties = configuration.getDestinationTopicProperties();
8384
assertThat(destinationTopicProperties.get(0).delay()).isEqualTo(0);
85+
assertThat(destinationTopicProperties.get(1).isRetryTopic()).isTrue();
8486
assertThat(destinationTopicProperties.get(1).delay()).isEqualTo(1000);
85-
assertThat(destinationTopicProperties.get(2).delay()).isEqualTo(1000);
86-
assertThat(destinationTopicProperties.get(3).delay()).isEqualTo(0);
87+
assertThat(destinationTopicProperties.get(2).isDltTopic()).isTrue();
88+
assertThat(destinationTopicProperties.get(2).delay()).isEqualTo(0);
8789
}
8890

8991
@Test
@@ -99,11 +101,10 @@ void shouldSetNoBackoffPolicy() {
99101
// then
100102
List<DestinationTopic.Properties> destinationTopicProperties = configuration.getDestinationTopicProperties();
101103
assertThat(destinationTopicProperties.get(0).delay()).isEqualTo(0);
104+
assertThat(destinationTopicProperties.get(1).isRetryTopic()).isTrue();
102105
assertThat(destinationTopicProperties.get(1).delay()).isEqualTo(0);
106+
assertThat(destinationTopicProperties.get(2).isDltTopic()).isTrue();
103107
assertThat(destinationTopicProperties.get(2).delay()).isEqualTo(0);
104-
assertThat(destinationTopicProperties.get(3).delay()).isEqualTo(0);
105-
106-
107108
}
108109

109110
@Test
@@ -203,7 +204,7 @@ void shouldSetDltRoutingRules() {
203204
.create(kafkaOperations);
204205

205206
// then
206-
DestinationTopic.Properties desExcDltProps = configuration.getDestinationTopicProperties().get(3);
207+
DestinationTopic.Properties desExcDltProps = configuration.getDestinationTopicProperties().get(2);
207208
assertThat(desExcDltProps.suffix()).isEqualTo("-deserialization-dlt");
208209
assertThat(desExcDltProps.usedForExceptions()).containsExactly(DeserializationException.class);
209210
}

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2024 the original author or authors.
2+
* Copyright 2021-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -95,6 +95,7 @@
9595
* @author Gary Russell
9696
* @author Wang Zhiyang
9797
* @author Sanghyeok An
98+
* @author Borahm Lee
9899
* @since 2.7
99100
*/
100101
@SpringJUnitConfig
@@ -652,7 +653,6 @@ public RetryTopicConfiguration firstRetryTopic(KafkaTemplate<String, String> tem
652653
.fixedBackOff(50)
653654
.maxAttempts(5)
654655
.concurrency(1)
655-
.useSingleTopicForSameIntervals()
656656
.includeTopic(FIRST_TOPIC)
657657
.doNotRetryOnDltFailure()
658658
.dltHandlerMethod("myCustomDltProcessor", DLT_METHOD_NAME)

0 commit comments

Comments
 (0)