Skip to content

Commit b88cbf9

Browse files
garyrussellartembilan
authored andcommitted
GH-2239: RetryableTopic Refactoring
Resolves #2239 * GH-2239: Replace PartitionPausingBackOffManager New back of manager (and factory) that uses a task scheduler to resume the paused partitions. Revert change to deprecated PartitionPausingBackoffManager. Log resume. * Remove legacy code. Also fix unrelated race in EKIT. Only allow one `RetryTemplateConfigurationSupport` bean. * Fix static var. * Docs. * More docs. * Remove more dead/deprecated code. * Address PR Comments. * Fix RetryTopicConfigurer bean retrieval. * Remove unnecessary casts in doc. # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java # spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicInternalBeanNames.java # spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java
1 parent 131b0e7 commit b88cbf9

File tree

43 files changed

+602
-2405
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+602
-2405
lines changed

spring-kafka-docs/src/main/asciidoc/index.adoc

+2-2
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ The <<kafka,main chapter>> covers the core classes to develop a Kafka applicatio
4747

4848
include::kafka.adoc[]
4949

50+
include::retrytopic.adoc[]
51+
5052
include::streams.adoc[]
5153

5254
include::testing.adoc[]
5355

54-
include::retrytopic.adoc[]
55-
5656
[[tips-n-tricks]]
5757
== Tips, Tricks and Examples
5858

spring-kafka-docs/src/main/asciidoc/kafka.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -5137,7 +5137,7 @@ This new error handler replaces the `SeekToCurrentErrorHandler` and `RecoveringB
51375137
One difference is that the fallback behavior for batch listeners (when an exception other than a `BatchListenerFailedException` is thrown) is the equivalent of the <<retrying-batch-eh>>.
51385138

51395139
IMPORTANT: Starting with version 2.9, the `DefaultErrorHandler` can be configured to provide the same semantics as seeking the unprocessed record offsets as discussed below, but without actually seeking.
5140-
Instead, the records are retained by the listener container and resubmitted to the listener after the error handler exits (and after performing a single paused `poll()`, to keep the consumer alive).
5140+
Instead, the records are retained by the listener container and resubmitted to the listener after the error handler exits (and after performing a single paused `poll()`, to keep the consumer alive; if <<retry-topic>> or a `ContainerPausingBackOffHandler` are being used, the pause may extend over multiple polls).
51415141
The error handler returns a result to the container that indicates whether the current failing record can be resubmitted, or if it was recovered and then it will not be sent to the listener again.
51425142
To enable this mode, set the property `seekAfterError` to `false`.
51435143

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

+29-21
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ IMPORTANT: This is an experimental feature and the usual rule of no breaking API
55
Users are encouraged to try out the feature and provide feedback via GitHub Issues or GitHub discussions.
66
This is regarding the API only; the feature is considered to be complete, and robust.
77

8+
Version 2.9 changed the mechanism to bootstrap infrastructure beans; see <<retry-config>> for the two mechanisms that are now required to bootstrap the feature.
9+
10+
After these changes, we are intending to remove the experimental designation, probably in version 3.0.
11+
812
Achieving non-blocking retry / dlt functionality with Kafka usually requires setting up extra topics and creating and configuring the corresponding listeners.
913
Since 2.7 Spring for Apache Kafka offers support for that via the `@RetryableTopic` annotation and `RetryTopicConfiguration` class to simplify that bootstrapping.
1014

@@ -33,28 +37,23 @@ If one message's processing takes longer than the next message's back off period
3337
Also, for short delays (about 1s or less), the maintenance work the thread has to do, such as committing offsets, may delay the message processing execution.
3438
The precision can also be affected if the retry topic's consumer is handling more than one partition, because we rely on waking up the consumer from polling and having full pollTimeouts to make timing adjustments.
3539

36-
That being said, for consumers handling a single partition the message's processing should happen under 100ms after it's exact due time for most situations.
40+
That being said, for consumers handling a single partition the message's processing should occur approximately at its exact due time for most situations.
3741

3842
IMPORTANT: It is guaranteed that a message will never be processed before its due time.
3943

40-
===== Tuning the Delay Precision
41-
42-
The message's processing delay precision relies on two `ContainerProperties`: `ContainerProperties.pollTimeout` and `ContainerProperties.idlePartitionEventInterval`.
43-
Both properties will be automatically set in the retry topic and dlt's `ListenerContainerFactory` to one quarter of the smallest delay value for that topic, with a minimum value of 250ms and a maximum value of 5000ms.
44-
These values will only be set if the property has its default values - if you change either value yourself your change will not be overridden.
45-
This way you can tune the precision and performance for the retry topics if you need to.
46-
47-
NOTE: You can have separate `ListenerContainerFactory` instances for the main and retry topics - this way you can have different settings to better suit your needs, such as having a higher polling timeout setting for the main topics and a lower one for the retry topics.
48-
44+
[[retry-config]]
4945
==== Configuration
5046

51-
Starting with version 2.9, the `@EnableKafkaRetryTopic` annotation should be used in a `@Configuration` annotated class.
47+
Starting with version 2.9, for default configuration, the `@EnableKafkaRetryTopic` annotation should be used in a `@Configuration` annotated class.
5248
This enables the feature to bootstrap properly and gives access to injecting some of the feature's components to be looked up at runtime.
53-
Also, to configure the feature's components and global features, the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the appropriate methods overridden.
54-
For more details refer to <<retry-topic-global-settings>>.
5549

5650
NOTE: It is not necessary to also add `@EnableKafka`, if you add this annotation, because `@EnableKafkaRetryTopic` is meta-annotated with `@EnableKafka`.
5751

52+
Also, starting with that version, for more advanced configuration of the feature's components and global features, the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the appropriate methods overridden.
53+
For more details refer to <<retry-topic-global-settings>>.
54+
55+
IMPORTANT: Only one of the above techniques can be used, and only one `@Configuration` class can extend `RetryTopicConfigurationSupport`.
56+
5857
===== Using the `@RetryableTopic` annotation
5958

6059
To configure the retry topic and dlt for a `@KafkaListener` annotated method, you just have to add the `@RetryableTopic` annotation to it and Spring for Apache Kafka will bootstrap all the necessary topics and consumers with the default configurations.
@@ -161,9 +160,9 @@ It's best to use a single `RetryTopicConfiguration` bean for configuration of su
161160
[[retry-topic-global-settings]]
162161
===== Configuring Global Settings and Features
163162

164-
Since 2.9, the previous bean overriding approach for configuring components has been deprecated.
165-
This does not change the `RetryTopicConfiguration` beans approach - only components' configurations.
166-
Now the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the proper methods overridden.
163+
Since 2.9, the previous bean overriding approach for configuring components has been removed (without deprecation, due to the aforementioned experimental nature of the API).
164+
This does not change the `RetryTopicConfiguration` beans approach - only infrastructure components' configurations.
165+
Now the `RetryTopicConfigurationSupport` class should be extended in a (single) `@Configuration` class, and the proper methods overridden.
167166
An example follows:
168167

169168
====
@@ -185,6 +184,15 @@ public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
185184
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
186185
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
187186
}
187+
188+
@Override
189+
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
190+
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
191+
customizersConfigurer.customizeErrorHandler(eh -> {
192+
eh.setSeekAfterError(false);
193+
});
194+
}
195+
188196
}
189197
----
190198
====
@@ -629,7 +637,7 @@ As an example the following implementation, in addition to the standard suffix,
629637
----
630638
public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {
631639
632-
@Override
640+
@Override
633641
public RetryTopicNamesProvider createRetryTopicNamesProvider(
634642
DestinationTopic.Properties properties) {
635643
@@ -728,7 +736,7 @@ In the latter the consumer ends the execution without forwarding the message.
728736
----
729737
730738
@RetryableTopic(dltProcessingFailureStrategy =
731-
DltStrategy.FAIL_ON_ERROR)
739+
DltStrategy.FAIL_ON_ERROR)
732740
@KafkaListener(topics = "my-annotated-topic")
733741
public void processMessage(MyPojo message) {
734742
// ... message processing
@@ -777,7 +785,7 @@ In this case after retrials are exhausted the processing simply ends.
777785
----
778786
779787
@RetryableTopic(dltProcessingFailureStrategy =
780-
DltStrategy.NO_DLT)
788+
DltStrategy.NO_DLT)
781789
@KafkaListener(topics = "my-annotated-topic")
782790
public void processMessage(MyPojo message) {
783791
// ... message processing
@@ -872,8 +880,8 @@ For example, to change the logging level to WARN you might add:
872880
----
873881
@Override
874882
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
875-
customizersConfigurer.customizeErrorHandler(commonErrorHandler ->
876-
((DefaultErrorHandler) commonErrorHandler).setLogLevel(KafkaException.Level.WARN))
883+
customizersConfigurer.customizeErrorHandler(defaultErrorHandler ->
884+
defaultErrorHandler.setLogLevel(KafkaException.Level.WARN))
877885
}
878886
----
879887
====

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

+13-22
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@
5858
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
5959
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
6060
import org.springframework.beans.factory.config.Scope;
61-
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
62-
import org.springframework.beans.factory.support.RootBeanDefinition;
6361
import org.springframework.context.ApplicationContext;
6462
import org.springframework.context.ApplicationContextAware;
6563
import org.springframework.context.ConfigurableApplicationContext;
@@ -188,6 +186,8 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>
188186

189187
private AnnotationEnhancer enhancer;
190188

189+
private RetryTopicConfigurer retryTopicConfigurer;
190+
191191
@Override
192192
public int getOrder() {
193193
return LOWEST_PRECEDENCE;
@@ -510,27 +510,18 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object
510510
}
511511

512512
private RetryTopicConfigurer getRetryTopicConfigurer() {
513-
bootstrapRetryTopicIfNecessary();
514-
return this.beanFactory.containsBean("internalRetryTopicConfigurer")
515-
? this.beanFactory.getBean("internalRetryTopicConfigurer", RetryTopicConfigurer.class)
516-
: this.beanFactory.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class);
517-
}
518-
519-
@SuppressWarnings("deprecation")
520-
private void bootstrapRetryTopicIfNecessary() {
521-
if (!(this.beanFactory instanceof BeanDefinitionRegistry)) {
522-
throw new IllegalStateException("BeanFactory must be an instance of "
523-
+ BeanDefinitionRegistry.class.getSimpleName()
524-
+ " to bootstrap the RetryTopic functionality. Provided beanFactory: "
525-
+ this.beanFactory.getClass().getSimpleName());
526-
}
527-
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) this.beanFactory;
528-
if (!registry.containsBeanDefinition("internalRetryTopicBootstrapper")) {
529-
registry.registerBeanDefinition("internalRetryTopicBootstrapper",
530-
new RootBeanDefinition(org.springframework.kafka.retrytopic.RetryTopicBootstrapper.class));
531-
this.beanFactory.getBean("internalRetryTopicBootstrapper",
532-
org.springframework.kafka.retrytopic.RetryTopicBootstrapper.class).bootstrapRetryTopic();
513+
if (this.retryTopicConfigurer == null) {
514+
try {
515+
this.retryTopicConfigurer = this.beanFactory
516+
.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class);
517+
}
518+
catch (NoSuchBeanDefinitionException ex) {
519+
this.logger.error("A 'RetryTopicConfigurer' with name "
520+
+ RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME + "is required.");
521+
throw ex;
522+
}
533523
}
524+
return this.retryTopicConfigurer;
534525
}
535526

536527
private Method checkProxy(Method methodArg, Object bean) {

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java

+7-20
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,6 @@ public class RetryableTopicAnnotationProcessor {
7676

7777
private final BeanExpressionContext expressionContext;
7878

79-
private static final String DEFAULT_SPRING_BOOT_KAFKA_TEMPLATE_NAME = "kafkaTemplate";
80-
8179
/**
8280
* Construct an instance using the provided parameters and default resolver,
8381
* expression context.
@@ -214,26 +212,15 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean
214212
}
215213
}
216214
try {
217-
return this.beanFactory.getBean(
218-
org.springframework.kafka.retrytopic.RetryTopicInternalBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME,
215+
return this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME,
219216
KafkaOperations.class);
220217
}
221-
catch (NoSuchBeanDefinitionException ex) {
222-
try {
223-
return this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME,
224-
KafkaOperations.class);
225-
}
226-
catch (NoSuchBeanDefinitionException ex2) {
227-
try {
228-
return this.beanFactory.getBean(DEFAULT_SPRING_BOOT_KAFKA_TEMPLATE_NAME, KafkaOperations.class);
229-
}
230-
catch (NoSuchBeanDefinitionException exc) {
231-
exc.addSuppressed(ex);
232-
exc.addSuppressed(ex2);
233-
throw new BeanInitializationException("Could not find a KafkaTemplate to configure the retry topics.", // NOSONAR (lost stack trace)
234-
exc);
235-
}
236-
}
218+
catch (NoSuchBeanDefinitionException ex2) {
219+
KafkaOperations<?, ?> kafkaOps = this.beanFactory.getBeanProvider(KafkaOperations.class).getIfUnique();
220+
Assert.state(kafkaOps != null, () -> "A single KafkaTemplate bean could not be found in the context; "
221+
+ " a single instance must exist, or one specifically named "
222+
+ RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME);
223+
return kafkaOps;
237224
}
238225
}
239226

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

+21-1
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public class KafkaListenerEndpointRegistry implements ListenerContainerRegistry,
7575

7676
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); //NOSONAR
7777

78+
private final Map<String, MessageListenerContainer> unregisteredContainers = new ConcurrentHashMap<>();
79+
7880
private final Map<String, MessageListenerContainer> listenerContainers = new ConcurrentHashMap<>();
7981

8082
private int phase = AbstractMessageListenerContainer.DEFAULT_PHASE;
@@ -109,6 +111,17 @@ public MessageListenerContainer getListenerContainer(String id) {
109111
return this.listenerContainers.get(id);
110112
}
111113

114+
@Override
115+
@Nullable
116+
public MessageListenerContainer getUnregisteredListenerContainer(String id) {
117+
MessageListenerContainer container = this.unregisteredContainers.get(id);
118+
if (container == null) {
119+
refreshContextContainers();
120+
return this.unregisteredContainers.get(id);
121+
}
122+
return null;
123+
}
124+
112125
/**
113126
* By default, containers registered for endpoints after the context is refreshed
114127
* are immediately started, regardless of their autoStartup property, to comply with
@@ -156,10 +169,17 @@ public Collection<MessageListenerContainer> getListenerContainers() {
156169
public Collection<MessageListenerContainer> getAllListenerContainers() {
157170
List<MessageListenerContainer> containers = new ArrayList<>();
158171
containers.addAll(getListenerContainers());
159-
containers.addAll(this.applicationContext.getBeansOfType(MessageListenerContainer.class, true, false).values());
172+
refreshContextContainers();
173+
containers.addAll(this.unregisteredContainers.values());
160174
return containers;
161175
}
162176

177+
private void refreshContextContainers() {
178+
this.unregisteredContainers.clear();
179+
this.applicationContext.getBeansOfType(MessageListenerContainer.class, true, false).values()
180+
.forEach(container -> this.unregisteredContainers.put(container.getListenerId(), container));
181+
}
182+
163183
/**
164184
* Create a message listener container for the given {@link KafkaListenerEndpoint}.
165185
* <p>This create the necessary infrastructure to honor that endpoint

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractKafkaBackOffManagerFactory.java

+10-9
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
* Base class for {@link KafkaBackOffManagerFactory} implementations.
2626
*
2727
* @author Tomaz Fernandes
28+
* @author Gary Russell
2829
* @since 2.7
2930
* @see KafkaConsumerBackoffManager
3031
*/
@@ -35,24 +36,23 @@ public abstract class AbstractKafkaBackOffManagerFactory
3536

3637
private ListenerContainerRegistry listenerContainerRegistry;
3738

39+
/**
40+
* Creates an instance that will retrieve the {@link ListenerContainerRegistry} from
41+
* the {@link ApplicationContext}.
42+
*/
43+
public AbstractKafkaBackOffManagerFactory() {
44+
this.listenerContainerRegistry = null;
45+
}
46+
3847
/**
3948
* Creates an instance with the provided {@link ListenerContainerRegistry},
4049
* which will be used to fetch the {@link MessageListenerContainer} to back off.
41-
4250
* @param listenerContainerRegistry the listenerContainerRegistry to use.
4351
*/
4452
public AbstractKafkaBackOffManagerFactory(ListenerContainerRegistry listenerContainerRegistry) {
4553
this.listenerContainerRegistry = listenerContainerRegistry;
4654
}
4755

48-
/**
49-
* Creates an instance that will retrieve the {@link ListenerContainerRegistry} from
50-
* the {@link ApplicationContext}.
51-
*/
52-
public AbstractKafkaBackOffManagerFactory() {
53-
this.listenerContainerRegistry = null;
54-
}
55-
5656
/**
5757
* Sets the {@link ListenerContainerRegistry}, that will be used to fetch the
5858
* {@link MessageListenerContainer} to back off.
@@ -90,4 +90,5 @@ protected <T> T getBean(String beanName, Class<T> beanClass) {
9090
public void setApplicationContext(ApplicationContext applicationContext) {
9191
this.applicationContext = applicationContext;
9292
}
93+
9394
}

spring-kafka/src/main/java/org/springframework/kafka/listener/BackOffHandler.java

+21-4
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,18 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import org.apache.kafka.common.TopicPartition;
20+
1921
import org.springframework.lang.Nullable;
2022

2123
/**
2224
* Handler for the provided back off time, listener container and exception.
25+
* Also supports back off for individual partitions.
2326
*
24-
* @author Jan Marincek
25-
* @since 2.9
27+
* @author Jan Marincek
28+
* @author Gary Russell
29+
* @since 2.9
2630
*/
27-
@FunctionalInterface
2831
public interface BackOffHandler {
2932

3033
/**
@@ -33,6 +36,20 @@ public interface BackOffHandler {
3336
* @param exception the exception.
3437
* @param nextBackOff the next back off.
3538
*/
36-
void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff);
39+
default void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff) {
40+
throw new UnsupportedOperationException();
41+
}
42+
43+
/**
44+
* Perform the next back off for a partition.
45+
* @param container the container.
46+
* @param partition the partition.
47+
* @param nextBackOff the next back off.
48+
*/
49+
default void onNextBackOff(@Nullable MessageListenerContainer container, TopicPartition partition,
50+
long nextBackOff) {
51+
52+
throw new UnsupportedOperationException();
53+
}
3754

3855
}

0 commit comments

Comments
 (0)