Skip to content

Commit 2f50088

Browse files
garyrussellartembilan
authored andcommitted
GH-2239: Fix Boot AutoConfiguration
See #2239 The previous commit removed the implicit bootstrap in favor of enforcing the user to use the `@EnableKafkaRretryTopic` or explicitly extend `RetryTopicConfigurationSupport`. Unfortunately, this breaks Spring Boot because it can auto configure a `RetryTopicConfiguration` bean, which means the infrastructure beans are required. Fallback to late binding of the infrastructure beans if a `RetryTopicConfiguration` bean is found in the application context. Tested with a Boot app. **cherry-pick to main**
1 parent edfbd1d commit 2f50088

File tree

7 files changed

+86
-14
lines changed

7 files changed

+86
-14
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

+46-3
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.springframework.context.ApplicationContextAware;
6363
import org.springframework.context.ConfigurableApplicationContext;
6464
import org.springframework.context.expression.StandardBeanExpressionResolver;
65+
import org.springframework.context.support.GenericApplicationContext;
6566
import org.springframework.core.MethodIntrospector;
6667
import org.springframework.core.OrderComparator;
6768
import org.springframework.core.Ordered;
@@ -83,11 +84,15 @@
8384
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
8485
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
8586
import org.springframework.kafka.listener.ContainerGroupSequencer;
87+
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
8688
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
8789
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
90+
import org.springframework.kafka.retrytopic.DestinationTopicResolver;
8891
import org.springframework.kafka.retrytopic.RetryTopicBeanNames;
8992
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
93+
import org.springframework.kafka.retrytopic.RetryTopicConfigurationSupport;
9094
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
95+
import org.springframework.kafka.retrytopic.RetryTopicSchedulerWrapper;
9196
import org.springframework.kafka.support.TopicPartitionOffset;
9297
import org.springframework.lang.Nullable;
9398
import org.springframework.messaging.converter.GenericMessageConverter;
@@ -96,6 +101,8 @@
96101
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
97102
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
98103
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
104+
import org.springframework.scheduling.TaskScheduler;
105+
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
99106
import org.springframework.util.Assert;
100107
import org.springframework.util.ReflectionUtils;
101108
import org.springframework.util.StringUtils;
@@ -516,14 +523,50 @@ private RetryTopicConfigurer getRetryTopicConfigurer() {
516523
.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class);
517524
}
518525
catch (NoSuchBeanDefinitionException ex) {
519-
this.logger.error("A 'RetryTopicConfigurer' with name "
520-
+ RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME + "is required.");
521-
throw ex;
526+
this.retryTopicConfigurer = createDefaultConfigurer();
522527
}
523528
}
524529
return this.retryTopicConfigurer;
525530
}
526531

532+
private RetryTopicConfigurer createDefaultConfigurer() {
533+
if (this.applicationContext instanceof GenericApplicationContext) {
534+
GenericApplicationContext gac = (GenericApplicationContext) this.applicationContext;
535+
gac.registerBean(
536+
RetryTopicBeanNames.DEFAULT_RETRY_TOPIC_CONFIG_SUPPORT_BEAN_NAME,
537+
RetryTopicConfigurationSupport.class,
538+
() -> new RetryTopicConfigurationSupport());
539+
RetryTopicConfigurationSupport rtcs = this.applicationContext.getBean(
540+
RetryTopicBeanNames.DEFAULT_RETRY_TOPIC_CONFIG_SUPPORT_BEAN_NAME,
541+
RetryTopicConfigurationSupport.class);
542+
DestinationTopicResolver destResolver = rtcs.destinationTopicResolver();
543+
RetryTopicSchedulerWrapper schedW = gac.getBeanProvider(RetryTopicSchedulerWrapper.class).getIfUnique();
544+
TaskScheduler sched = gac.getBeanProvider(TaskScheduler.class).getIfUnique();
545+
if (schedW == null && sched == null) {
546+
RetryTopicSchedulerWrapper newSchedW = new RetryTopicSchedulerWrapper(new ThreadPoolTaskScheduler());
547+
gac.registerBean(RetryTopicBeanNames.DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME,
548+
RetryTopicSchedulerWrapper.class, () -> newSchedW);
549+
schedW = gac.getBean(RetryTopicSchedulerWrapper.class);
550+
}
551+
KafkaConsumerBackoffManager bom =
552+
rtcs.kafkaConsumerBackoffManager(this.applicationContext, this.registrar.getEndpointRegistry(),
553+
schedW, sched);
554+
RetryTopicConfigurer rtc = rtcs.retryTopicConfigurer(bom, destResolver, this.beanFactory);
555+
556+
gac.registerBean(RetryTopicBeanNames.DESTINATION_TOPIC_RESOLVER_BEAN_NAME, DestinationTopicResolver.class,
557+
() -> destResolver);
558+
gac.registerBean(KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME,
559+
KafkaConsumerBackoffManager.class, () -> bom);
560+
gac.registerBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class,
561+
() -> rtc);
562+
563+
return this.beanFactory
564+
.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class);
565+
}
566+
throw new IllegalStateException("When there is no RetryTopicConfigurationSupport bean, the application context "
567+
+ "must be a GenericApplicationContext");
568+
}
569+
527570
private Method checkProxy(Method methodArg, Object bean) {
528571
Method method = methodArg;
529572
if (AopUtils.isJdkDynamicProxy(bean)) {

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractKafkaBackOffManagerFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ protected <T> T getBean(String beanName, Class<T> beanClass) {
8787
}
8888

8989
@Override
90-
public void setApplicationContext(ApplicationContext applicationContext) {
90+
public final void setApplicationContext(ApplicationContext applicationContext) {
9191
this.applicationContext = applicationContext;
9292
}
9393

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManagerFactory.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import org.springframework.context.ApplicationContext;
1920
import org.springframework.util.Assert;
2021

2122
/**
@@ -32,9 +33,13 @@ public class ContainerPartitionPausingBackOffManagerFactory extends AbstractKafk
3233
/**
3334
* Construct an instance with the provided properties.
3435
* @param listenerContainerRegistry the registry.
36+
* @param applicationContext the application context.
3537
*/
36-
public ContainerPartitionPausingBackOffManagerFactory(ListenerContainerRegistry listenerContainerRegistry) {
38+
public ContainerPartitionPausingBackOffManagerFactory(ListenerContainerRegistry listenerContainerRegistry,
39+
ApplicationContext applicationContext) {
40+
3741
super(listenerContainerRegistry);
42+
setApplicationContext(applicationContext);
3843
}
3944

4045
@Override

Diff for: spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicBeanNames.java

+14
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,21 @@
1919
/**
2020
* The bean names for the non-blocking topic-based delayed retries feature.
2121
* @author Tomaz Fernandes
22+
* @author Gary Russell
2223
* @since 2.9
2324
*/
2425
public final class RetryTopicBeanNames {
2526

2627
private RetryTopicBeanNames() {
2728
}
2829

30+
/**
31+
* The bean name of an internally managed retry topic configuration support, if
32+
* needed.
33+
*/
34+
public static final String DEFAULT_RETRY_TOPIC_CONFIG_SUPPORT_BEAN_NAME =
35+
"org.springframework.kafka.retrytopic.internalRetryTopicConfigurationSupport";
36+
2937
/**
3038
* The bean name of the internally managed retry topic configurer.
3139
*/
@@ -50,4 +58,10 @@ private RetryTopicBeanNames() {
5058
public static final String DEFAULT_KAFKA_TEMPLATE_BEAN_NAME =
5159
"defaultRetryTopicKafkaTemplate";
5260

61+
/**
62+
* The bean name of the internally registered scheduler wrapper, if needed.
63+
*/
64+
public static final String DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME =
65+
"defaultRetryTopicKafkaTemplate";
66+
5367
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicComponentFactory.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.time.Clock;
2020

2121
import org.springframework.beans.factory.BeanFactory;
22+
import org.springframework.context.ApplicationContext;
2223
import org.springframework.kafka.config.KafkaListenerContainerFactory;
2324
import org.springframework.kafka.config.KafkaListenerEndpoint;
2425
import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManagerFactory;
@@ -150,10 +151,13 @@ public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
150151
* {@link KafkaConsumerBackoffManager} instance used to back off the partitions.
151152
* @param registry the {@link ListenerContainerRegistry} used to fetch the
152153
* {@link MessageListenerContainer}.
154+
* @param applicationContext the application context.
153155
* @return the instance.
154156
*/
155-
public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(ListenerContainerRegistry registry) {
156-
return new ContainerPartitionPausingBackOffManagerFactory(registry);
157+
public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(ListenerContainerRegistry registry,
158+
ApplicationContext applicationContext) {
159+
160+
return new ContainerPartitionPausingBackOffManagerFactory(registry, applicationContext);
157161
}
158162

159163
/**

Diff for: spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import org.springframework.beans.factory.BeanFactory;
2929
import org.springframework.beans.factory.annotation.Qualifier;
30+
import org.springframework.context.ApplicationContext;
3031
import org.springframework.context.annotation.Bean;
3132
import org.springframework.context.annotation.Configuration;
3233
import org.springframework.kafka.annotation.EnableKafkaRetryTopic;
@@ -72,7 +73,7 @@ public class RetryTopicConfigurationSupport {
7273

7374
private final RetryTopicComponentFactory componentFactory = createComponentFactory();
7475

75-
protected RetryTopicConfigurationSupport() {
76+
public RetryTopicConfigurationSupport() {
7677
Assert.state(ONLY_ONE_ALLOWED.getAndSet(false), "Only one 'RetryTopicConfigurationSupport' is allowed");
7778
}
7879

@@ -266,20 +267,21 @@ protected Consumer<DestinationTopicResolver> configureDestinationTopicResolver()
266267
* To provide a custom implementation, either override this method, or
267268
* override the {@link RetryTopicComponentFactory#kafkaBackOffManagerFactory} method
268269
* and return a different {@link KafkaBackOffManagerFactory}.
270+
* @param applicationContext the application context.
269271
* @param registry the {@link ListenerContainerRegistry} to be used to fetch the
270272
* {@link MessageListenerContainer} at runtime to be backed off.
271273
* @param wrapper a {@link RetryTopicSchedulerWrapper}.
272274
* @param taskScheduler a {@link TaskScheduler}.
273275
* @return the instance.
274276
*/
275277
@Bean(name = KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME)
276-
public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(
278+
public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(ApplicationContext applicationContext,
277279
@Qualifier(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
278280
ListenerContainerRegistry registry, @Nullable RetryTopicSchedulerWrapper wrapper,
279281
@Nullable TaskScheduler taskScheduler) {
280282

281283
KafkaBackOffManagerFactory backOffManagerFactory =
282-
this.componentFactory.kafkaBackOffManagerFactory(registry);
284+
this.componentFactory.kafkaBackOffManagerFactory(registry, applicationContext);
283285
JavaUtils.INSTANCE.acceptIfInstanceOf(ContainerPartitionPausingBackOffManagerFactory.class, backOffManagerFactory,
284286
factory -> configurePartitionPausingFactory(factory, registry,
285287
wrapper != null ? wrapper.getScheduler() : taskScheduler));

Diff for: spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.mockito.ArgumentCaptor;
3636

3737
import org.springframework.beans.factory.BeanFactory;
38+
import org.springframework.context.ApplicationContext;
3839
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
3940
import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManagerFactory;
4041
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
@@ -167,7 +168,8 @@ void testCreateBackOffManager() {
167168
KafkaConsumerBackoffManager backoffManagerMock = mock(KafkaConsumerBackoffManager.class);
168169
TaskScheduler taskSchedulerMock = mock(TaskScheduler.class);
169170
Clock clock = mock(Clock.class);
170-
given(componentFactory.kafkaBackOffManagerFactory(registry)).willReturn(factory);
171+
ApplicationContext ctx = mock(ApplicationContext.class);
172+
given(componentFactory.kafkaBackOffManagerFactory(registry, ctx)).willReturn(factory);
171173
given(factory.create()).willReturn(backoffManagerMock);
172174
RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport() {
173175

@@ -177,19 +179,21 @@ protected RetryTopicComponentFactory createComponentFactory() {
177179
}
178180

179181
};
180-
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(registry, null,
182+
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, null,
181183
taskSchedulerMock);
182184
assertThat(backoffManager).isEqualTo(backoffManagerMock);
183-
then(componentFactory).should().kafkaBackOffManagerFactory(registry);
185+
then(componentFactory).should().kafkaBackOffManagerFactory(registry, ctx);
184186
then(factory).should().create();
185187
}
186188

187189
@Test
188190
void testCreateBackOffManagerNoConfiguration() {
189191
ListenerContainerRegistry registry = mock(ListenerContainerRegistry.class);
190192
TaskScheduler scheduler = mock(TaskScheduler.class);
193+
ApplicationContext ctx = mock(ApplicationContext.class);
191194
RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport();
192-
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(registry, null, scheduler);
195+
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, null,
196+
scheduler);
193197
assertThat(backoffManager).isNotNull();
194198
}
195199

0 commit comments

Comments
 (0)