Skip to content

Commit f360f8d

Browse files
garyrussellartembilan
authored andcommitted
GH-2375: Add KLER.unregisterListenerContainer
Resolves #2375 Also add docs for creating dynamic listener containers. **cherry-pick to 2.9.x, 2.8.x**
1 parent 3d8dcce commit f360f8d

File tree

8 files changed

+441
-1
lines changed

8 files changed

+441
-1
lines changed

Diff for: spring-kafka-docs/src/main/asciidoc/kafka.adoc

+53
Original file line numberDiff line numberDiff line change
@@ -2717,6 +2717,59 @@ Also see `interceptBeforeTx`.
27172717
|n/a
27182718
|A reference to all child `KafkaMessageListenerContainer` s.
27192719
|===
2720+
2721+
[[dynamic-containers]]
2722+
==== Dynamically Creating Containers
2723+
2724+
There are several techniques that can be used to create listener containers at runtime.
2725+
This section explores some of those techniques.
2726+
2727+
===== MessageListener Implementations
2728+
2729+
If you implement your own listener directly, you can simply use the container factory to create a raw container for that listener:
2730+
2731+
.User Listener
2732+
====
2733+
[source, java, role="primary", indent=0]
2734+
.Java
2735+
----
2736+
include::{java-examples}/dynamic/MyListener.java[tag=listener]
2737+
include::{java-examples}/dynamic/Application.java[tag=create]
2738+
----
2739+
[source, kotlin, role="secondary",indent=0]
2740+
.Kotlin
2741+
----
2742+
include::{kotlin-examples}/dynamic/Application.kt[tag=listener]
2743+
include::{kotlin-examples}/dynamic/Application.kt[tag=create]
2744+
----
2745+
====
2746+
2747+
===== Prototype Beans
2748+
2749+
Containers for methods annotated with `@KafkaListener` can be created dynamically by declaring the bean as prototype:
2750+
2751+
.Prototype
2752+
====
2753+
[source, java, role="primary", indent=0]
2754+
.Java
2755+
----
2756+
include::{java-examples}/dynamic/MyPojo.java[tag=pojo]
2757+
include::{java-examples}/dynamic/Application.java[tag=pojoBean]
2758+
include::{java-examples}/dynamic/Application.java[tag=getBeans]
2759+
----
2760+
[source, kotlin, role="secondary",indent=0]
2761+
.Kotlin
2762+
----
2763+
include::{kotlin-examples}/dynamic/Application.kt[tag=pojo]
2764+
include::{kotlin-examples}/dynamic/Application.kt[tag=pojoBean]
2765+
include::{kotlin-examples}/dynamic/Application.kt[tag=getBeans]
2766+
----
2767+
====
2768+
2769+
IMPORTANT: Listeners must have unique IDs.
2770+
Starting with version 2.8.9, the `KafkaListenerEndpointRegistry` has a new method `unregisterListenerContainer(String id)` to allow you to re-use an id.
2771+
Unregistering a container does not `stop()` the container, you must do that yourself.
2772+
27202773
[[events]]
27212774
==== Application Events
27222775

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.jdocs.dynamic;
18+
19+
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
20+
import org.springframework.boot.ApplicationRunner;
21+
import org.springframework.boot.SpringApplication;
22+
import org.springframework.boot.autoconfigure.SpringBootApplication;
23+
import org.springframework.context.ApplicationContext;
24+
import org.springframework.context.annotation.Bean;
25+
import org.springframework.context.annotation.Scope;
26+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
27+
import org.springframework.kafka.config.TopicBuilder;
28+
import org.springframework.kafka.core.KafkaAdmin;
29+
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
30+
31+
/**
32+
* Dynamic listeners.
33+
*
34+
* @author Gary Russell
35+
* @since 2.8.9
36+
*
37+
*/
38+
@SpringBootApplication
39+
public class Application {
40+
41+
public static void main(String[] args) {
42+
SpringApplication.run(Application.class, args);
43+
}
44+
45+
@Bean
46+
ApplicationRunner runner(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
47+
return args -> {
48+
createContainer(factory, "topic1", "group1");
49+
};
50+
}
51+
52+
@Bean
53+
public ApplicationRunner runner1(ApplicationContext applicationContext) {
54+
return args -> {
55+
// tag::getBeans[]
56+
57+
applicationContext.getBean(MyPojo.class, "one", "topic2");
58+
applicationContext.getBean(MyPojo.class, "two", "topic3");
59+
// end::getBeans[]
60+
};
61+
}
62+
63+
64+
// tag::create[]
65+
66+
private ConcurrentMessageListenerContainer<String, String> createContainer(
67+
ConcurrentKafkaListenerContainerFactory<String, String> factory, String topic, String group) {
68+
69+
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
70+
container.getContainerProperties().setMessageListener(new MyListener());
71+
container.getContainerProperties().setGroupId(group);
72+
container.setBeanName(group);
73+
container.start();
74+
return container;
75+
}
76+
// end::create[]
77+
@Bean
78+
public KafkaAdmin.NewTopics topics() {
79+
return new KafkaAdmin.NewTopics(
80+
TopicBuilder.name("topic1")
81+
.partitions(10)
82+
.replicas(1)
83+
.build(),
84+
TopicBuilder.name("topic2")
85+
.partitions(10)
86+
.replicas(1)
87+
.build(),
88+
TopicBuilder.name("topic3")
89+
.partitions(10)
90+
.replicas(1)
91+
.build());
92+
}
93+
94+
// tag::pojoBean[]
95+
96+
@Bean
97+
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
98+
MyPojo pojo(String id, String topic) {
99+
return new MyPojo(id, topic);
100+
}
101+
//end::pojoBean[]
102+
103+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.jdocs.dynamic;
18+
19+
import org.apache.kafka.clients.consumer.ConsumerRecord;
20+
21+
import org.springframework.kafka.listener.MessageListener;
22+
23+
/**
24+
* {@link MessageListener} for dynamic containers.
25+
* @author Gary Russell
26+
* @since 2.8.9
27+
*
28+
*/
29+
//tag::listener[]
30+
31+
public class MyListener implements MessageListener<String, String> {
32+
33+
@Override
34+
public void onMessage(ConsumerRecord<String, String> data) {
35+
// ...
36+
}
37+
38+
}
39+
// end::listener[]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.jdocs.dynamic;
18+
19+
import org.springframework.kafka.annotation.KafkaListener;
20+
21+
/**
22+
* Pojo for dynamic listener creation.
23+
* @author Gary Russell
24+
* @since 2.8.9
25+
*
26+
*/
27+
//tag::pojo[]
28+
29+
public class MyPojo {
30+
31+
private final String id;
32+
33+
private final String topic;
34+
35+
public MyPojo(String id, String topic) {
36+
this.id = id;
37+
this.topic = topic;
38+
}
39+
40+
public String getId() {
41+
return this.id;
42+
}
43+
44+
public String getTopic() {
45+
return this.topic;
46+
}
47+
48+
@KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topic}")
49+
public void listen(String in) {
50+
System.out.println(in);
51+
}
52+
53+
}
54+
// end::pojo[]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.kafka.kdocs.dynamic
17+
18+
import org.apache.kafka.clients.consumer.ConsumerRecord
19+
import org.springframework.beans.factory.config.ConfigurableBeanFactory
20+
import org.springframework.boot.ApplicationArguments
21+
import org.springframework.boot.ApplicationRunner
22+
import org.springframework.boot.SpringApplication
23+
import org.springframework.boot.autoconfigure.SpringBootApplication
24+
import org.springframework.context.ApplicationContext
25+
import org.springframework.context.annotation.Bean
26+
import org.springframework.context.annotation.Scope
27+
import org.springframework.kafka.annotation.KafkaListener
28+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
29+
import org.springframework.kafka.config.TopicBuilder
30+
import org.springframework.kafka.core.KafkaAdmin.NewTopics
31+
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer
32+
import org.springframework.kafka.listener.MessageListener
33+
34+
/**
35+
* @author Gary Russell
36+
* @since 2.8.9
37+
*/
38+
@SpringBootApplication
39+
class Application {
40+
@Bean
41+
fun runner(factory: ConcurrentKafkaListenerContainerFactory<String, String>): ApplicationRunner {
42+
return ApplicationRunner { args: ApplicationArguments? -> createContainer(factory, "topic1", "group1") }
43+
}
44+
45+
@Bean
46+
fun runner(applicationContext: ApplicationContext): ApplicationRunner {
47+
return ApplicationRunner { args: ApplicationArguments? ->
48+
// tag::getBeans[]
49+
50+
applicationContext.getBean(MyPojo::class.java, "one", arrayOf("topic2"))
51+
applicationContext.getBean(MyPojo::class.java, "two", arrayOf("topic3"))
52+
// end::getBeans[]
53+
}
54+
}
55+
56+
// tag::create[]
57+
58+
private fun createContainer(
59+
factory: ConcurrentKafkaListenerContainerFactory<String, String>, topic: String, group: String
60+
): ConcurrentMessageListenerContainer<String, String> {
61+
val container = factory.createContainer(topic)
62+
container.containerProperties.messageListener = MyListener()
63+
container.containerProperties.groupId = group
64+
container.beanName = group
65+
container.start()
66+
return container
67+
}
68+
// end::create[]
69+
@Bean
70+
fun topics(): NewTopics {
71+
return NewTopics(
72+
TopicBuilder.name("topic1")
73+
.partitions(10)
74+
.replicas(1)
75+
.build(),
76+
TopicBuilder.name("topic2")
77+
.partitions(10)
78+
.replicas(1)
79+
.build(),
80+
TopicBuilder.name("topic3")
81+
.partitions(10)
82+
.replicas(1)
83+
.build()
84+
)
85+
}
86+
87+
// tag::pojoBean[]
88+
89+
@Bean
90+
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
91+
fun pojo(id: String?, topic: String?): MyPojo {
92+
return MyPojo(id, topic)
93+
}
94+
//end::pojoBean[]
95+
96+
companion object {
97+
@JvmStatic
98+
fun main(args: Array<String>) {
99+
SpringApplication.run(Application::class.java, *args).close()
100+
}
101+
}
102+
}
103+
104+
// tag::listener[]
105+
106+
class MyListener : MessageListener<String?, String?> {
107+
108+
override fun onMessage(data: ConsumerRecord<String?, String?>) {
109+
// ...
110+
}
111+
112+
}
113+
// end::listener[]
114+
115+
// tag::pojo[]
116+
117+
class MyPojo(id: String?, topic: String?) {
118+
119+
@KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topics}"])
120+
fun listen(`in`: String?) {
121+
println(`in`)
122+
}
123+
124+
}
125+
// end::pojo[]

Diff for: spring-kafka-docs/src/main/resources/logback.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<appender-ref ref="STDOUT" />
1313
</root>
1414

15-
<logger name="org.springframework" level="WARN" />
15+
<logger name="org.springframework" level="INFO" />
1616
<logger name="org.springframework.kafka.jdocs" level="INFO" />
1717
<logger name="org.springframework.kafka.Kdocs" level="INFO" />
1818

0 commit comments

Comments
 (0)