Skip to content

Commit f6c0996

Browse files
garyrussellartembilan
authored andcommitted
GH-2297: Add KLERegistry.alwaysStartAfterRefresh
Resolves #2297 Add an option to apply `autoStartup` semantics to late registrations. **cherry-pick to 2.9.x, 2.8.x**
1 parent a6aff27 commit f6c0996

File tree

3 files changed

+47
-2
lines changed

3 files changed

+47
-2
lines changed

Diff for: spring-kafka-docs/src/main/asciidoc/kafka.adoc

+4
Original file line numberDiff line numberDiff line change
@@ -1953,6 +1953,10 @@ A collection of managed containers can be obtained by calling the registry's `ge
19531953
Version 2.2.5 added a convenience method `getAllListenerContainers()`, which returns a collection of all containers, including those managed by the registry and those declared as beans.
19541954
The collection returned will include any prototype beans that have been initialized, but it will not initialize any lazy bean declarations.
19551955

1956+
IMPORTANT: Endpoints registered after the application context has been refreshed will start immediately, regardless of their `autoStartup` property, to comply with the `SmartLifecycle` contract, where `autoStartup` is only considered during application context initialization.
1957+
An example of late registration is a bean with a `@KafkaListener` in prototype scope where an instance is created after the context is initialized.
1958+
Starting with version 2.8.7, you can set the registry's `alwaysStartAfterRefresh` property to `false` and then the container's `autoStartup` property will define whether or not the container is started.
1959+
19561960
[[kafka-validation]]
19571961
===== `@KafkaListener` `@Payload` Validation
19581962

Diff for: spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ public class KafkaListenerEndpointRegistry implements ListenerContainerRegistry,
8383

8484
private boolean contextRefreshed;
8585

86+
private boolean alwaysStartAfterRefresh = true;
87+
8688
private volatile boolean running;
8789

8890
@Override
@@ -107,6 +109,20 @@ public MessageListenerContainer getListenerContainer(String id) {
107109
return this.listenerContainers.get(id);
108110
}
109111

112+
/**
113+
* By default, containers registered for endpoints after the context is refreshed
114+
* are immediately started, regardless of their autoStartup property, to comply with
115+
* the {@link SmartLifecycle} contract, where autoStartup is only considered during
116+
* context initialization. Set to false to apply the autoStartup property, even for
117+
* late endpoint binding. If this is called after the context is refreshed, it will
118+
* apply to any endpoints registered after that call.
119+
* @param alwaysStartAfterRefresh false to apply the property.
120+
* @since 2.8.7
121+
*/
122+
public void setAlwaysStartAfterRefresh(boolean alwaysStartAfterRefresh) {
123+
this.alwaysStartAfterRefresh = alwaysStartAfterRefresh;
124+
}
125+
110126
/**
111127
* Return the ids of the managed {@link MessageListenerContainer} instance(s).
112128
* @return the ids.
@@ -327,7 +343,7 @@ public void onApplicationEvent(ContextRefreshedEvent event) {
327343
* @see MessageListenerContainer#isAutoStartup()
328344
*/
329345
private void startIfNecessary(MessageListenerContainer listenerContainer) {
330-
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
346+
if ((this.contextRefreshed && this.alwaysStartAfterRefresh) || listenerContainer.isAutoStartup()) {
331347
listenerContainer.start();
332348
}
333349
}

Diff for: spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

+26-1
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,12 @@
7171
import org.springframework.beans.factory.annotation.Autowired;
7272
import org.springframework.beans.factory.config.BeanDefinition;
7373
import org.springframework.beans.factory.config.BeanPostProcessor;
74+
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
75+
import org.springframework.context.ApplicationContext;
7476
import org.springframework.context.annotation.Bean;
7577
import org.springframework.context.annotation.Configuration;
7678
import org.springframework.context.annotation.Role;
79+
import org.springframework.context.annotation.Scope;
7780
import org.springframework.context.event.EventListener;
7881
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
7982
import org.springframework.core.MethodParameter;
@@ -179,7 +182,7 @@
179182
"annotated25", "annotated25reply1", "annotated25reply2", "annotated26", "annotated27", "annotated28",
180183
"annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33",
181184
"annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle",
182-
"annotated38", "annotated38reply", "annotated39", "annotated40", "annotated41" })
185+
"annotated38", "annotated38reply", "annotated39", "annotated40", "annotated41", "annotated42" })
183186
@TestPropertySource(properties = "spel.props=fetch.min.bytes=420000,max.poll.records=10")
184187
public class EnableKafkaIntegrationTests {
185188

@@ -981,6 +984,14 @@ public void testContentConversion() throws InterruptedException {
981984
assertThat(this.listener.contentFoo).isEqualTo(new Foo("bar"));
982985
}
983986

987+
@Test
988+
void proto(@Autowired ApplicationContext context) {
989+
this.registry.setAlwaysStartAfterRefresh(false);
990+
context.getBean(ProtoListener.class);
991+
assertThat(this.registry.getListenerContainer("proto").isRunning()).isFalse();
992+
this.registry.setAlwaysStartAfterRefresh(true);
993+
}
994+
984995
@Configuration
985996
@EnableKafka
986997
@EnableTransactionManagement(proxyTargetClass = true)
@@ -1710,6 +1721,20 @@ String barInfo() {
17101721
return "info for the bar listener";
17111722
}
17121723

1724+
@Bean
1725+
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
1726+
ProtoListener proto() {
1727+
return new ProtoListener();
1728+
}
1729+
1730+
}
1731+
1732+
static class ProtoListener {
1733+
1734+
@KafkaListener(id = "proto", topics = "annotated-42", autoStartup = "false")
1735+
public void listen(String in) {
1736+
}
1737+
17131738
}
17141739

17151740
@Component

0 commit comments

Comments
 (0)