Skip to content

Commit 4fedd75

Browse files
authored
GH-2443: Distinct Concurrency for Retry Containers
Resolves #2443 Optionally configure a different concurrency for retry containers. * Add Javadocs to RetryTopicConfigurationBuilder. * Javadoc polishing.
1 parent 62300bb commit 4fedd75

File tree

8 files changed

+279
-18
lines changed

8 files changed

+279
-18
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

+4
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ NOTE: It is not necessary to also add `@EnableKafka`, if you add this annotation
4646
Also, starting with that version, for more advanced configuration of the feature's components and global features, the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the appropriate methods overridden.
4747
For more details refer to <<retry-topic-global-settings>>.
4848

49+
By default, the containers for the retry topics will have the same concurrency as the main container.
50+
Starting with version 3.0, you can set a different `concurrency` for the retry containers (either on the annotation, or in `RetryConfigurationBuilder`).
51+
4952
IMPORTANT: Only one of the above techniques can be used, and only one `@Configuration` class can extend `RetryTopicConfigurationSupport`.
5053

5154
===== Using the `@RetryableTopic` annotation
@@ -108,6 +111,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> templa
108111
.newInstance()
109112
.fixedBackoff(3000)
110113
.maxAttempts(5)
114+
.concurrency(1)
111115
.includeTopics("my-topic", "my-other-topic")
112116
.create(template);
113117
}

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

+3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ See <<same-broker-multiple-tests>> for more information.
3535
This feature is no longer considered experimental (as far as its API is concerned), the feature itself has been supported since 2.7, but with a greater than normal possibility of breaking API changes.
3636

3737
The bootstrapping of <<retry-topic>> infrastructure beans has changed in this release to avoid some timing problems that occurred in some application regarding application initialization.
38+
39+
You can now set a different `concurrency` for the retry containers; by default, the concurrency is the same as the main container.
40+
3841
See <<retry-config>> for more information.
3942

4043
[[x30-lc-changes]]

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java

+8
Original file line numberDiff line numberDiff line change
@@ -194,4 +194,12 @@
194194
*/
195195
String autoStartDltHandler() default "";
196196

197+
/**
198+
* Concurrency for the retry and DLT containers; if not specified, the main container
199+
* concurrency is used.
200+
* @return the concurrency.
201+
* @since 3.0
202+
*/
203+
String concurrency() default "";
204+
197205
}

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java

+1
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
129129
}
130130
return RetryTopicConfigurationBuilder.newInstance()
131131
.maxAttempts(resolveExpressionAsInteger(annotation.attempts(), "attempts", true))
132+
.concurrency(resolveExpressionAsInteger(annotation.concurrency(), "concurrency", false))
132133
.customBackoff(createBackoffFromAnnotation(annotation.backoff(), this.beanFactory))
133134
.retryTopicSuffix(resolveExpressionAsString(annotation.retryTopicSuffix(), "retryTopicSuffix"))
134135
.dltSuffix(resolveExpressionAsString(annotation.dltTopicSuffix(), "dltTopicSuffix"))

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfiguration.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020

2121
import org.springframework.kafka.support.AllowDenyCollectionManager;
2222
import org.springframework.kafka.support.EndpointHandlerMethod;
23+
import org.springframework.lang.Nullable;
2324

2425
/**
2526
* Contains the provided configuration for the retryable topics.
@@ -45,18 +46,23 @@ public class RetryTopicConfiguration {
4546

4647
private final ListenerContainerFactoryConfigurer.Configuration factoryConfigurerConfig;
4748

49+
@Nullable
50+
private final Integer concurrency;
51+
4852
RetryTopicConfiguration(List<DestinationTopic.Properties> destinationTopicProperties,
4953
EndpointHandlerMethod dltHandlerMethod,
5054
TopicCreation kafkaTopicAutoCreationConfig,
5155
AllowDenyCollectionManager<String> topicAllowListManager,
5256
ListenerContainerFactoryResolver.Configuration factoryResolverConfig,
53-
ListenerContainerFactoryConfigurer.Configuration factoryConfigurerConfig) {
57+
ListenerContainerFactoryConfigurer.Configuration factoryConfigurerConfig,
58+
@Nullable Integer concurrency) {
5459
this.destinationTopicProperties = destinationTopicProperties;
5560
this.dltHandlerMethod = dltHandlerMethod;
5661
this.kafkaTopicAutoCreationConfig = kafkaTopicAutoCreationConfig;
5762
this.topicAllowListManager = topicAllowListManager;
5863
this.factoryResolverConfig = factoryResolverConfig;
5964
this.factoryConfigurerConfig = factoryConfigurerConfig;
65+
this.concurrency = concurrency;
6066
}
6167

6268
public boolean hasConfigurationForTopics(String[] topics) {
@@ -83,6 +89,11 @@ public List<DestinationTopic.Properties> getDestinationTopicProperties() {
8389
return this.destinationTopicProperties;
8490
}
8591

92+
@Nullable
93+
public Integer getConcurrency() {
94+
return this.concurrency;
95+
}
96+
8697
static class TopicCreation {
8798

8899
private final boolean shouldCreateTopics;

0 commit comments

Comments
 (0)