Skip to content

Commit 43df65f

Browse files
authored
Add an option to leave group when closing streams
Fixes: spring-projects#3168 * Adding an option in `StreamsBuilderFactoryBean` to allow the consumer to leave the group upon closing the Kafka Streams.
1 parent ae775d8 commit 43df65f

File tree

2 files changed

+24
-1
lines changed

2 files changed

+24
-1
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc

+8
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,14 @@ You can declare and use any additional `StreamsBuilderFactoryBean` beans as well
213213
You can perform additional customization of that bean, by providing a bean that implements `StreamsBuilderFactoryBeanConfigurer`.
214214
If there are multiple such beans, they will be applied according to their `Ordered.order` property.
215215

216+
217+
=== Cleanup & Stop configuration
218+
219+
When the factory is stopped, the `KafkaStreams.close()` is called with 2 parameters :
220+
221+
* closeTimeout : how long to to wait for the threads to shutdown (defaults to `DEFAULT_CLOSE_TIMEOUT` set to 10 seconds). Can be configured using `StreamsBuilderFactoryBean.setCloseTimeout()`.
222+
* leaveGroupOnClose : to trigger consumer leave call from the group (defaults to `false`). Can be configured using `StreamsBuilderFactoryBean.setLeaveGroupOnClose()`.
223+
216224
By default, when the factory bean is stopped, the `KafkaStreams.cleanUp()` method is called.
217225
Starting with version 2.1.2, the factory bean has additional constructors, taking a `CleanupConfig` object that has properties to let you control whether the `cleanUp()` method is called during `start()` or `stop()` or neither.
218226
Starting with version 2.7, the default is to never clean up local state.

spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
* @author Nurettin Yilmaz
5656
* @author Denis Washington
5757
* @author Gary Russell
58+
* @author Julien Wittouck
5859
*
5960
* @since 1.1.4
6061
*/
@@ -100,6 +101,8 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde
100101

101102
private Duration closeTimeout = DEFAULT_CLOSE_TIMEOUT;
102103

104+
private boolean leaveGroupOnClose = false;
105+
103106
private KafkaStreams kafkaStreams;
104107

105108
private volatile boolean running;
@@ -225,6 +228,15 @@ public void setCloseTimeout(int closeTimeout) {
225228
this.closeTimeout = Duration.ofSeconds(closeTimeout); // NOSONAR (sync)
226229
}
227230

231+
/**
232+
* Specify if the consumer should leave the group when stopping Kafka Streams. Defaults to false.
233+
* @param leaveGroupOnClose true to leave the group when stopping the Streams
234+
* @since 3.2.0
235+
*/
236+
public void setLeaveGroupOnClose(boolean leaveGroupOnClose) {
237+
this.leaveGroupOnClose = leaveGroupOnClose;
238+
}
239+
228240
/**
229241
* Providing access to the associated {@link Topology} of this
230242
* {@link StreamsBuilderFactoryBean}.
@@ -383,7 +395,10 @@ public void stop() {
383395
if (this.running) {
384396
try {
385397
if (this.kafkaStreams != null) {
386-
this.kafkaStreams.close(this.closeTimeout);
398+
this.kafkaStreams.close(new KafkaStreams.CloseOptions()
399+
.timeout(this.closeTimeout)
400+
.leaveGroup(this.leaveGroupOnClose)
401+
);
387402
if (this.cleanupConfig.cleanupOnStop()) {
388403
this.kafkaStreams.cleanUp();
389404
}

0 commit comments

Comments
 (0)