Skip to content

Commit e77e1ef

Browse files
nyilmazartembilan
authored andcommitted
GH-621: Add KafkaStreams customizer
Fixes #621 Refactor; - Revert configurer - Add setters to `StreamsBuilderFactoryBean` - Create `CompositeKafkaStreamsCustomizer` Polish, update docs. * Polishing code style, Docs and Java Docs
1 parent 202f73c commit e77e1ef

File tree

5 files changed

+243
-12
lines changed

5 files changed

+243
-12
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.core;
18+
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
22+
import org.apache.kafka.streams.KafkaStreams;
23+
24+
/**
25+
* Composite {@link KafkaStreamsCustomizer} customizes {@link KafkaStreams} by delegating
26+
* to a list of provided {@link KafkaStreamsCustomizer}.
27+
*
28+
* @author Nurettin Yilmaz
29+
* @author Artem Bilan
30+
*
31+
* @since 2.1.5
32+
*/
33+
public class CompositeKafkaStreamsCustomizer implements KafkaStreamsCustomizer {
34+
35+
private final List<KafkaStreamsCustomizer> kafkaStreamsCustomizers = new ArrayList<>();
36+
37+
public CompositeKafkaStreamsCustomizer() {
38+
}
39+
40+
public CompositeKafkaStreamsCustomizer(List<KafkaStreamsCustomizer> kafkaStreamsCustomizers) {
41+
this.kafkaStreamsCustomizers.addAll(kafkaStreamsCustomizers);
42+
}
43+
44+
@Override
45+
public void customize(KafkaStreams kafkaStreams) {
46+
this.kafkaStreamsCustomizers.forEach(kafkaStreamsCustomizer -> kafkaStreamsCustomizer.customize(kafkaStreams));
47+
}
48+
49+
public void addKafkaStreamsCustomizers(List<KafkaStreamsCustomizer> kafkaStreamsCustomizers) {
50+
this.kafkaStreamsCustomizers.addAll(kafkaStreamsCustomizers);
51+
}
52+
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.core;
18+
19+
import org.apache.kafka.streams.KafkaStreams;
20+
21+
/**
22+
* Callback interface that can be used to configure {@link KafkaStreams} directly.
23+
*
24+
* @author Nurettin Yilmaz
25+
*
26+
* @since 2.1.5
27+
*
28+
* @see StreamsBuilderFactoryBean
29+
*/
30+
@FunctionalInterface
31+
public interface KafkaStreamsCustomizer {
32+
33+
void customize(KafkaStreams kafkaStreams);
34+
35+
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/core/StreamsBuilderFactoryBean.java

+43-11
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.kafka.streams.KafkaStreams;
2424
import org.apache.kafka.streams.StreamsBuilder;
2525
import org.apache.kafka.streams.StreamsConfig;
26+
import org.apache.kafka.streams.Topology;
27+
import org.apache.kafka.streams.processor.StateRestoreListener;
2628
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
2729

2830
import org.springframework.beans.factory.config.AbstractFactoryBean;
@@ -35,35 +37,43 @@
3537
* An {@link AbstractFactoryBean} for the {@link StreamsBuilder} instance
3638
* and lifecycle control for the internal {@link KafkaStreams} instance.
3739
*
40+
* <p>A fine grained control on {@link KafkaStreams} can be achieved by
41+
* {@link KafkaStreamsCustomizer}s</p>
42+
*
3843
* @author Artem Bilan
3944
* @author Ivan Ursul
4045
* @author Soby Chacko
4146
* @author Zach Olauson
47+
* @author Nurettin Yilmaz
4248
*
4349
* @since 1.1.4
4450
*/
4551
public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilder> implements SmartLifecycle {
4652

4753
private static final int DEFAULT_CLOSE_TIMEOUT = 10;
4854

49-
private final CleanupConfig cleanupConfig;
55+
private KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier();
5056

5157
private StreamsConfig streamsConfig;
5258

53-
private KafkaStreams kafkaStreams;
59+
private final CleanupConfig cleanupConfig;
5460

55-
private KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier();
61+
private KafkaStreamsCustomizer kafkaStreamsCustomizer;
5662

57-
private boolean autoStartup = true;
63+
private KafkaStreams.StateListener stateListener;
5864

59-
private int phase = Integer.MAX_VALUE - 1000;
65+
private StateRestoreListener stateRestoreListener;
6066

61-
private KafkaStreams.StateListener stateListener;
67+
private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
68+
69+
private boolean autoStartup = true;
6270

63-
private Thread.UncaughtExceptionHandler exceptionHandler;
71+
private int phase = Integer.MAX_VALUE - 1000;
6472

6573
private int closeTimeout = DEFAULT_CLOSE_TIMEOUT;
6674

75+
private KafkaStreams kafkaStreams;
76+
6777
private volatile boolean running;
6878

6979
/**
@@ -140,12 +150,27 @@ public void setClientSupplier(KafkaClientSupplier clientSupplier) {
140150
this.clientSupplier = clientSupplier; // NOSONAR (sync)
141151
}
142152

153+
/**
154+
* Specify a {@link KafkaStreamsCustomizer} to customize a {@link KafkaStreams}
155+
* instance during {@link #start()}.
156+
* @param kafkaStreamsCustomizer the {@link KafkaStreamsCustomizer} to use.
157+
* @since 2.1.5
158+
*/
159+
public void setKafkaStreamsCustomizer(KafkaStreamsCustomizer kafkaStreamsCustomizer) {
160+
Assert.notNull(kafkaStreamsCustomizer, "'kafkaStreamsCustomizer' must not be null");
161+
this.kafkaStreamsCustomizer = kafkaStreamsCustomizer; // NOSONAR (sync)
162+
}
163+
143164
public void setStateListener(KafkaStreams.StateListener stateListener) {
144165
this.stateListener = stateListener; // NOSONAR (sync)
145166
}
146167

147168
public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler exceptionHandler) {
148-
this.exceptionHandler = exceptionHandler; // NOSONAR (sync)
169+
this.uncaughtExceptionHandler = exceptionHandler; // NOSONAR (sync)
170+
}
171+
172+
public void setStateRestoreListener(StateRestoreListener stateRestoreListener) {
173+
this.stateRestoreListener = stateRestoreListener; // NOSONAR (sync)
149174
}
150175

151176
/**
@@ -171,7 +196,6 @@ protected StreamsBuilder createInstance() throws Exception {
171196
return new StreamsBuilder();
172197
}
173198

174-
175199
public void setAutoStartup(boolean autoStartup) {
176200
this.autoStartup = autoStartup;
177201
}
@@ -198,9 +222,17 @@ public synchronized void start() {
198222
if (!this.running) {
199223
try {
200224
Assert.notNull(this.streamsConfig, "'streamsConfig' must not be null");
201-
this.kafkaStreams = new KafkaStreams(getObject().build(), this.streamsConfig, this.clientSupplier);
225+
Topology topology = getObject().build();
226+
if (logger.isDebugEnabled()) {
227+
logger.debug(topology.describe());
228+
}
229+
this.kafkaStreams = new KafkaStreams(topology, this.streamsConfig, this.clientSupplier);
202230
this.kafkaStreams.setStateListener(this.stateListener);
203-
this.kafkaStreams.setUncaughtExceptionHandler(this.exceptionHandler);
231+
this.kafkaStreams.setGlobalStateRestoreListener(this.stateRestoreListener);
232+
this.kafkaStreams.setUncaughtExceptionHandler(this.uncaughtExceptionHandler);
233+
if (this.kafkaStreamsCustomizer != null) {
234+
this.kafkaStreamsCustomizer.customize(this.kafkaStreams);
235+
}
204236
if (this.cleanupConfig.cleanupOnStart()) {
205237
this.kafkaStreams.cleanUp();
206238
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.core;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
24+
import org.apache.kafka.streams.KafkaStreams;
25+
import org.apache.kafka.streams.StreamsConfig;
26+
import org.junit.jupiter.api.Test;
27+
28+
import org.springframework.beans.factory.annotation.Autowired;
29+
import org.springframework.beans.factory.annotation.Value;
30+
import org.springframework.context.annotation.Bean;
31+
import org.springframework.context.annotation.Configuration;
32+
import org.springframework.kafka.annotation.EnableKafka;
33+
import org.springframework.kafka.annotation.EnableKafkaStreams;
34+
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
35+
import org.springframework.kafka.test.context.EmbeddedKafka;
36+
import org.springframework.kafka.test.rule.KafkaEmbedded;
37+
import org.springframework.test.annotation.DirtiesContext;
38+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
39+
40+
41+
/**
42+
* @author Nurettin Yilmaz
43+
*
44+
* @since 2.1.5
45+
*/
46+
@SpringJUnitConfig
47+
@DirtiesContext
48+
@EmbeddedKafka
49+
public class KafkaStreamsCustomizerTests {
50+
51+
private static final String APPLICATION_ID = "testStreams";
52+
53+
private static final TestStateListener STATE_LISTENER = new TestStateListener();
54+
55+
@Autowired
56+
private StreamsBuilderFactoryBean streamsBuilderFactoryBean;
57+
58+
@Test
59+
public void testKafkaStreamsCustomizer() {
60+
KafkaStreams.State state = this.streamsBuilderFactoryBean.getKafkaStreams().state();
61+
assertThat(STATE_LISTENER.getCurrentState()).isEqualTo(state);
62+
}
63+
64+
@Configuration
65+
@EnableKafka
66+
@EnableKafkaStreams
67+
public static class KafkaStreamsConfiguration {
68+
69+
@Value("${" + KafkaEmbedded.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
70+
private String brokerAddresses;
71+
72+
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
73+
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder() {
74+
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(kStreamsConfigs());
75+
streamsBuilderFactoryBean.setKafkaStreamsCustomizer(customizer());
76+
return streamsBuilderFactoryBean;
77+
}
78+
79+
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
80+
public StreamsConfig kStreamsConfigs() {
81+
Map<String, Object> props = new HashMap<>();
82+
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
83+
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
84+
return new StreamsConfig(props);
85+
}
86+
87+
88+
private KafkaStreamsCustomizer customizer() {
89+
return kafkaStreams -> kafkaStreams.setStateListener(STATE_LISTENER);
90+
}
91+
92+
}
93+
94+
static class TestStateListener implements KafkaStreams.StateListener {
95+
96+
private KafkaStreams.State currentState;
97+
98+
@Override
99+
public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
100+
this.currentState = newState;
101+
}
102+
103+
KafkaStreams.State getCurrentState() {
104+
return this.currentState;
105+
}
106+
107+
}
108+
109+
}

Diff for: src/reference/asciidoc/streams.adoc

+3-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ If you would like to control lifecycle manually (e.g. stop and start by some con
7070
Since `StreamsBuilderFactoryBean` utilize its internal `KafkaStreams` instance, it is safe to stop and restart it again - a new `KafkaStreams` is created on each `start()`.
7171
Also consider using different `StreamsBuilderFactoryBean` s, if you would like to control lifecycles for `KStream` instances separately.
7272

73-
You can specify `KafkaStreams.StateListener` and `Thread.UncaughtExceptionHandler` options on the `StreamsBuilderFactoryBean` which are delegated to the internal `KafkaStreams` instance.
73+
You also can specify `KafkaStreams.StateListener`, `Thread.UncaughtExceptionHandler` and `StateRestoreListener` options on the `StreamsBuilderFactoryBean` which are delegated to the internal `KafkaStreams` instance.
74+
Also apart from setting those options indirectly on `StreamsBuilderFactoryBean`, starting with _version 2.1.5_, a `KafkaStreamsCustomizer` callback interface can be used to configure inner `KafkaStreams` instance.
75+
Note that `KafkaStreamsCustomizer` will override the options which are given via `StreamsBuilderFactoryBean`.
7476
That internal `KafkaStreams` instance can be accessed via `StreamsBuilderFactoryBean.getKafkaStreams()` if you need to perform some `KafkaStreams` operations directly.
7577
You can autowire `StreamsBuilderFactoryBean` bean by type, but you should be sure that you use full type in the bean definition, for example:
7678

0 commit comments

Comments
 (0)