Skip to content

Commit 43a8b2d

Browse files
garyrussellartembilan
authored andcommitted
2.0.0 (Streams) Doc Polishing
`Properties` instead of `StreamsConfig`.
1 parent b845fbe commit 43a8b2d

File tree

2 files changed

+20
-10
lines changed

2 files changed

+20
-10
lines changed

Diff for: src/reference/asciidoc/streams.adoc

+14-10
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,13 @@ This is an `AbstractFactoryBean` implementation to expose a `StreamsBuilder` sin
4646
[source, java]
4747
----
4848
@Bean
49-
public FactoryBean<StreamsBuilderFactoryBean> myKStreamBuilder(StreamsConfig streamsConfig) {
49+
public FactoryBean<StreamsBuilderFactoryBean> myKStreamBuilder(Properties streamsConfig) {
5050
return new StreamsBuilderFactoryBean(streamsConfig);
5151
}
5252
----
5353

54+
IMPORTANT: Starting with version 2.2, the stream configuration is now provided as a simple `Properties` object, rather than a `StreamsConfig`.
55+
5456
The `StreamsBuilderFactoryBean` also implements `SmartLifecycle` to manage lifecycle of an internal `KafkaStreams` instance.
5557
Similar to the Kafka Streams API, the `KStream` instances must be defined before starting the `KafkaStreams`, and that also applies for the Spring API for Kafka Streams.
5658
Therefore we have to declare `KStream` s on the `StreamsBuilder` before the application context is refreshed, when we use default `autoStartup = true` on the `StreamsBuilderFactoryBean`.
@@ -79,7 +81,7 @@ You can autowire `StreamsBuilderFactoryBean` bean by type, but you should be sur
7981
[source,java]
8082
----
8183
@Bean
82-
public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) {
84+
public StreamsBuilderFactoryBean myKStreamBuilder(Properties streamsConfig) {
8385
return new StreamsBuilderFactoryBean(streamsConfig);
8486
}
8587
...
@@ -91,7 +93,7 @@ Or add `@Qualifier` for injection by name if you use interface bean definition:
9193
[source,java]
9294
----
9395
@Bean
94-
public FactoryBean<StreamsBuilder> myKStreamBuilder(StreamsConfig streamsConfig) {
96+
public FactoryBean<StreamsBuilder> myKStreamBuilder(Properties streamsConfig) {
9597
return new StreamsBuilderFactoryBean(streamsConfig);
9698
}
9799
...
@@ -115,12 +117,14 @@ IMPORTANT: Since Kafka Streams do not support headers, the `addTypeInfo` propert
115117

116118
==== Configuration
117119

118-
To configure the Kafka Streams environment, the `StreamsBuilderFactoryBean` requires a `Map` of particular properties or a `StreamsConfig` instance.
120+
To configure the Kafka Streams environment, the `StreamsBuilderFactoryBean` requires a `Properties` instance.
119121
See Apache Kafka https://kafka.apache.org/0102/documentation/#streamsconfigs[documentation] for all possible options.
120122

121-
To avoid boilerplate code for most cases, especially when you develop micro services, Spring for Apache Kafka provides the `@EnableKafkaStreams` annotation, which should be placed alongside with `@Configuration`.
122-
Only you need is to declare `StreamsConfig` bean with the `defaultKafkaStreamsConfig` name.
123-
A `StreamsBuilder` bean with the `defaultKafkaStreamsBuilder` name will be declare in the application context automatically.
123+
IMPORTANT: Starting with version 2.2, the stream configuration is now provided as a simple `Properties` object, rather than a `StreamsConfig`.
124+
125+
To avoid boilerplate code for most cases, especially when you develop micro services, Spring for Apache Kafka provides the `@EnableKafkaStreams` annotation, which should be placed on a `@Configuration` class.
126+
All you need is to declare a `Properties` bean with the name `defaultKafkaStreamsConfig`.
127+
A `StreamsBuilder` bean, with the name `defaultKafkaStreamsBuilder`, will be declared in the application context automatically.
124128
Any additional `StreamsBuilderFactoryBean` beans can be declared and used as well.
125129

126130
By default, when the factory bean is stopped, the `KafkaStreams.cleanUp()` method is called.
@@ -138,13 +142,13 @@ Putting it all together:
138142
public static class KafkaStreamsConfiguration {
139143
140144
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
141-
public StreamsConfig kStreamsConfigs() {
142-
Map<String, Object> props = new HashMap<>();
145+
public Properties kStreamsConfigs() {
146+
Properties props = new Properties<>();
143147
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
144148
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
145149
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
146150
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
147-
return new StreamsConfig(props);
151+
return props;
148152
}
149153
150154
@Bean

Diff for: src/reference/asciidoc/whats-new.adoc

+6
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,9 @@ You can now provide type mapping information using producer/consumer properties.
7171
New constructors are available on the deserializer to allow overriding the type header information with the supplied target type.
7272

7373
See <<serdes>> for more information.
74+
75+
==== Kafka Streams Changes
76+
77+
The streams configuration bean must now be a simple `Properties` object instead of a `StreamsConfig`.
78+
79+
See <<kafka-streams>> for more information.

0 commit comments

Comments
 (0)