diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 33efab12c531..550e3f424dff 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -166,6 +166,7 @@ private void configureContainer(ContainerProperties container) { PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); Listener properties = this.properties.getListener(); map.from(properties::getAckMode).to(container::setAckMode); + map.from(properties::getAsyncAcks).to(container::setAsyncAcks); map.from(properties::getClientId).to(container::setClientId); map.from(properties::getAckCount).to(container::setAckCount); map.from(properties::getAckTime).as(Duration::toMillis).to(container::setAckTime); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index adc9991c272d..9845110a82d5 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -881,6 +881,13 @@ public enum Type { */ private AckMode ackMode; + /** + * Support for asynchronous record acknowledgments. Only applies with + * ContainerProperties.AckMode.MANUAL or + * ContainerProperties.AckMode.MANUAL_IMMEDIATE. + */ + private Boolean asyncAcks; + /** * Prefix for the listener's consumer client.id property. */ @@ -969,6 +976,14 @@ public void setAckMode(AckMode ackMode) { this.ackMode = ackMode; } + public Boolean getAsyncAcks() { + return this.asyncAcks; + } + + public void setAsyncAcks(Boolean asyncAcks) { + this.asyncAcks = asyncAcks; + } + public String getClientId() { return this.clientId; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 913c39025258..b61bc1528ebe 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -450,18 +450,20 @@ void streamsApplicationIdIsNotMandatoryIfEnableKafkaStreamsIsNotSet() { @SuppressWarnings("unchecked") @Test void listenerProperties() { - this.contextRunner.withPropertyValues("spring.kafka.template.default-topic=testTopic", - "spring.kafka.template.transaction-id-prefix=txOverride", "spring.kafka.listener.ack-mode=MANUAL", - "spring.kafka.listener.client-id=client", "spring.kafka.listener.ack-count=123", - "spring.kafka.listener.ack-time=456", "spring.kafka.listener.concurrency=3", - "spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.no-poll-threshold=2.5", - "spring.kafka.listener.type=batch", "spring.kafka.listener.idle-between-polls=1s", - "spring.kafka.listener.idle-event-interval=1s", - "spring.kafka.listener.idle-partition-event-interval=1s", "spring.kafka.listener.monitor-interval=45", - "spring.kafka.listener.log-container-config=true", "spring.kafka.listener.missing-topics-fatal=true", - "spring.kafka.jaas.enabled=true", "spring.kafka.listener.immediate-stop=true", - "spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo", - "spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true") + this.contextRunner + .withPropertyValues("spring.kafka.template.default-topic=testTopic", + "spring.kafka.template.transaction-id-prefix=txOverride", + "spring.kafka.listener.ack-mode=MANUAL", "spring.kafka.listener.client-id=client", + "spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456", + "spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000", + "spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch", + "spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s", + "spring.kafka.listener.idle-partition-event-interval=1s", + "spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true", + "spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true", + "spring.kafka.listener.immediate-stop=true", "spring.kafka.producer.transaction-id-prefix=foo", + "spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.control-flag=REQUISITE", + "spring.kafka.jaas.options.useKeyTab=true", "spring.kafka.listener.async-acks=true") .run((context) -> { DefaultKafkaProducerFactory producerFactory = context .getBean(DefaultKafkaProducerFactory.class); @@ -477,6 +479,7 @@ void listenerProperties() { assertThat(kafkaListenerContainerFactory.getConsumerFactory()).isEqualTo(consumerFactory); ContainerProperties containerProperties = kafkaListenerContainerFactory.getContainerProperties(); assertThat(containerProperties.getAckMode()).isEqualTo(AckMode.MANUAL); + assertThat(containerProperties.isAsyncAcks()).isEqualTo(true); assertThat(containerProperties.getClientId()).isEqualTo("client"); assertThat(containerProperties.getAckCount()).isEqualTo(123); assertThat(containerProperties.getAckTime()).isEqualTo(456L);