Skip to content

Commit 15facd0

Browse files
garyrussellartembilan
authored andcommitted
SIKGH-204: Support auto config for all containers
See spring-attic/spring-integration-kafka#204 With this in place, we could pass a container factory into the KMDCA and/or a DSL spec, along with topic configuration. This would enable boot properties to be used to configure the container (and any container that is not used for a `@KafkaListener`. With the integration DSL, something like ``` IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(containerFactory, String... topics) .handle(...) ``` where the `ConsumerFactory` and `ContainerProperties` are provided by the factory. * Javadoc polishing. * Polishing - pull methods up to abstract factory * Polishing - add setAutoStartup to container interface
1 parent 6686e5a commit 15facd0

File tree

8 files changed

+253
-15
lines changed

8 files changed

+253
-15
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

+54-9
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.springframework.kafka.config;
1818

1919

20+
import java.util.Arrays;
21+
import java.util.Collection;
2022
import java.util.regex.Pattern;
2123

2224
import org.springframework.beans.BeanUtils;
@@ -31,6 +33,7 @@
3133
import org.springframework.kafka.listener.ErrorHandler;
3234
import org.springframework.kafka.listener.GenericErrorHandler;
3335
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
36+
import org.springframework.kafka.support.TopicPartitionInitialOffset;
3437
import org.springframework.kafka.support.converter.MessageConverter;
3538
import org.springframework.retry.RecoveryCallback;
3639
import org.springframework.retry.support.RetryTemplate;
@@ -241,15 +244,6 @@ public ContainerProperties getContainerProperties() {
241244
public C createListenerContainer(KafkaListenerEndpoint endpoint) {
242245
C instance = createContainerInstance(endpoint);
243246

244-
if (this.autoStartup != null) {
245-
instance.setAutoStartup(this.autoStartup);
246-
}
247-
if (this.phase != null) {
248-
instance.setPhase(this.phase);
249-
}
250-
if (this.applicationEventPublisher != null) {
251-
instance.setApplicationEventPublisher(this.applicationEventPublisher);
252-
}
253247
if (endpoint.getId() != null) {
254248
instance.setBeanName(endpoint.getId());
255249
}
@@ -316,6 +310,57 @@ protected void initializeContainer(C instance) {
316310
if (this.errorHandler != null) {
317311
instance.setGenericErrorHandler(this.errorHandler);
318312
}
313+
if (this.autoStartup != null) {
314+
instance.setAutoStartup(this.autoStartup);
315+
}
316+
if (this.phase != null) {
317+
instance.setPhase(this.phase);
318+
}
319+
if (this.applicationEventPublisher != null) {
320+
instance.setApplicationEventPublisher(this.applicationEventPublisher);
321+
}
322+
}
323+
324+
@Override
325+
public C createContainer(final Collection<TopicPartitionInitialOffset> topicPartitions) {
326+
C container = createContainerInstance(new KafkaListenerEndpointAdapter() {
327+
328+
@Override
329+
public Collection<TopicPartitionInitialOffset> getTopicPartitions() {
330+
return topicPartitions;
331+
}
332+
333+
});
334+
initializeContainer(container);
335+
return container;
336+
}
337+
338+
@Override
339+
public C createContainer(final String... topics) {
340+
C container = createContainerInstance(new KafkaListenerEndpointAdapter() {
341+
342+
@Override
343+
public Collection<String> getTopics() {
344+
return Arrays.asList(topics);
345+
}
346+
347+
});
348+
initializeContainer(container);
349+
return container;
350+
}
351+
352+
@Override
353+
public C createContainer(final Pattern topicPattern) {
354+
C container = createContainerInstance(new KafkaListenerEndpointAdapter() {
355+
356+
@Override
357+
public Pattern getTopicPattern() {
358+
return topicPattern;
359+
}
360+
361+
});
362+
initializeContainer(container);
363+
return container;
319364
}
320365

321366
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/config/ConcurrentKafkaListenerContainerFactory.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@
2626
* A {@link KafkaListenerContainerFactory} implementation to build a
2727
* {@link ConcurrentMessageListenerContainer}.
2828
* <p>
29-
* This should be the default for most users and a good transition paths
30-
* for those that are used to build such container definitions manually.
29+
* This should be the default for most users and a good transition paths for those that
30+
* are used to building such container definitions manually.
31+
*
32+
* This factory is primarily for building containers for {@code KafkaListener} annotated
33+
* methods but can also be used to create any container.
3134
*
3235
* @param <K> the key type.
3336
* @param <V> the value type.
@@ -81,3 +84,4 @@ protected void initializeContainer(ConcurrentMessageListenerContainer<K, V> inst
8184
}
8285

8386
}
87+

Diff for: spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerContainerFactory.java

+34-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,15 +16,19 @@
1616

1717
package org.springframework.kafka.config;
1818

19+
import java.util.Collection;
20+
import java.util.regex.Pattern;
21+
1922
import org.springframework.kafka.listener.MessageListenerContainer;
23+
import org.springframework.kafka.support.TopicPartitionInitialOffset;
2024

2125
/**
22-
* Factory of {@link MessageListenerContainer} based on a
23-
* {@link KafkaListenerEndpoint} definition.
26+
* Factory for {@link MessageListenerContainer}s.
2427
*
2528
* @param <C> the {@link MessageListenerContainer} implementation type.
2629
*
2730
* @author Stephane Nicoll
31+
* @author Gary Russell
2832
*
2933
* @see KafkaListenerEndpoint
3034
*/
@@ -37,4 +41,31 @@ public interface KafkaListenerContainerFactory<C extends MessageListenerContaine
3741
*/
3842
C createListenerContainer(KafkaListenerEndpoint endpoint);
3943

44+
/**
45+
* Create and configure a container without a listener; used to create containers that
46+
* are not used for KafkaListener annotations.
47+
* @param topicPartitions the topicPartitions.
48+
* @return the container.
49+
* @since 2.2
50+
*/
51+
C createContainer(Collection<TopicPartitionInitialOffset> topicPartitions);
52+
53+
/**
54+
* Create and configure a container without a listener; used to create containers that
55+
* are not used for KafkaListener annotations.
56+
* @param topics the topics.
57+
* @return the container.
58+
* @since 2.2
59+
*/
60+
C createContainer(String... topics);
61+
62+
/**
63+
* Create and configure a container without a listener; used to create containers that
64+
* are not used for KafkaListener annotations.
65+
* @param topicPattern the topicPattern.
66+
* @return the container.
67+
* @since 2.2
68+
*/
69+
C createContainer(Pattern topicPattern);
70+
4071
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.config;
18+
19+
import java.util.Collection;
20+
import java.util.Collections;
21+
import java.util.regex.Pattern;
22+
23+
import org.springframework.kafka.listener.MessageListenerContainer;
24+
import org.springframework.kafka.support.TopicPartitionInitialOffset;
25+
import org.springframework.kafka.support.converter.MessageConverter;
26+
27+
/**
28+
* Adapter to avoid having to implement all methods.
29+
*
30+
* @author Gary Russell
31+
* @since 2.2
32+
*
33+
*/
34+
class KafkaListenerEndpointAdapter implements KafkaListenerEndpoint {
35+
36+
KafkaListenerEndpointAdapter() {
37+
super();
38+
}
39+
40+
@Override
41+
public String getId() {
42+
return null;
43+
}
44+
45+
@Override
46+
public String getGroupId() {
47+
return null;
48+
}
49+
50+
@Override
51+
public String getGroup() {
52+
return null;
53+
}
54+
55+
@Override
56+
public Collection<String> getTopics() {
57+
return Collections.emptyList();
58+
}
59+
60+
@Override
61+
public Collection<TopicPartitionInitialOffset> getTopicPartitions() {
62+
return Collections.emptyList();
63+
}
64+
65+
@Override
66+
public Pattern getTopicPattern() {
67+
return null;
68+
}
69+
70+
@Override
71+
public String getClientIdPrefix() {
72+
return null;
73+
}
74+
75+
@Override
76+
public void setupListenerContainer(MessageListenerContainer listenerContainer,
77+
MessageConverter messageConverter) {
78+
}
79+
80+
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java

+10
Original file line numberDiff line numberDiff line change
@@ -104,4 +104,14 @@ default boolean isContainerPaused() {
104104
throw new UnsupportedOperationException("This container doesn't support pause/resume");
105105
}
106106

107+
/**
108+
* Set the autoStartup.
109+
* @param autoStartup the autoStartup to set.
110+
* @since 2.2
111+
* @see SmartLifecycle
112+
*/
113+
default void setAutoStartup(boolean autoStartup) {
114+
// empty
115+
}
116+
107117
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.annotation;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.Mockito.mock;
21+
22+
import org.junit.jupiter.api.Test;
23+
24+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
25+
import org.springframework.kafka.core.ConsumerFactory;
26+
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
27+
import org.springframework.kafka.test.utils.KafkaTestUtils;
28+
29+
/**
30+
* @author Gary Russell
31+
* @since 2.2
32+
*
33+
*/
34+
public class ContainerFactoryTests {
35+
36+
@Test
37+
public void testConfigContainer() {
38+
ConcurrentKafkaListenerContainerFactory<String, String> factory =
39+
new ConcurrentKafkaListenerContainerFactory<>();
40+
factory.setAutoStartup(false);
41+
factory.setConcurrency(22);
42+
@SuppressWarnings("unchecked")
43+
ConsumerFactory<String, String> cf = mock(ConsumerFactory.class);
44+
factory.setConsumerFactory(cf);
45+
factory.setPhase(42);
46+
factory.getContainerProperties().setAckCount(123);
47+
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("foo");
48+
assertThat(container.isAutoStartup()).isFalse();
49+
assertThat(container.getPhase()).isEqualTo(42);
50+
assertThat(container.getContainerProperties().getAckCount()).isEqualTo(123);
51+
assertThat(KafkaTestUtils.getPropertyValue(container, "concurrency", Integer.class)).isEqualTo(22);
52+
}
53+
54+
}

Diff for: src/reference/asciidoc/kafka.adoc

+9
Original file line numberDiff line numberDiff line change
@@ -1375,6 +1375,15 @@ You can also perform seek operations from `onIdleContainer()` when an idle conta
13751375

13761376
To arbitrarily seek at runtime, use the callback reference from the `registerSeekCallback` for the appropriate thread.
13771377

1378+
[[container-factory]]
1379+
===== Container factory
1380+
1381+
As discussed in <<kafka-listener-annotation>> a `ConcurrentKafkaListenerContainerFactory` is used to create containers for annotated methods.
1382+
1383+
Starting with _version 2.2_, the same factory can be used to create any `ConcurrentMessageListenerContainer`.
1384+
This might be useful if you want to create several containers with similar properties, or you wish to use some exernally configured factory, such as the one provided by Spring Boot auto configuration.
1385+
Once the container is created, you can further modify its properties, many of which are set by using `container.getContainerProperties()`.
1386+
13781387
[[pause-resume]]
13791388
==== Pausing/Resuming Listener Containers
13801389

Diff for: src/reference/asciidoc/whats-new.adoc

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
=== What's new in 2.1 Since 2.0
1+
=== What's new in 2.2 Since 2.1
22

33
==== Kafka Client Version
44

@@ -15,3 +15,8 @@ The enum `AckMode` has been moved from `AbstractMessageListenerContainer` to `Co
1515
==== After rollback processing
1616

1717
A new `AfterRollbackProcessor` strategy is provided - see <<after-rollback>> for more information.
18+
19+
==== ConcurrentKafkaListenerContainerFactory changes
20+
21+
The `ConcurrentKafkaListenerContainerFactory` can now be used to create/configure any `ConcurrentMessageListenerContainer`, not just those for `@KafkaListener` annotations.
22+
See <<container-factory>> for more information.

0 commit comments

Comments
 (0)