Skip to content

Commit 1171c9d

Browse files
artembilanspring-builds
authored andcommitted
GH-3740: Fix HandlerAdapter to detect Kotlin suspend functions
Fixes: #3740 Issue link: #3740 Even if Kotlin `suspend` functions are called properly, the acknowledgement is not called because this kind of method is not treated as an `asyncReplies` mode * Fix `HandlerAdapter` to check for `KotlinDetector.isSuspendingFunction()` in addition to `CompletableFuture` & `Mono` * Adjust `EnableKafkaKotlinCoroutinesTests.kt` to verify that `acknowledgement` has been called by the Framework # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java (cherry picked from commit 914cf62)
1 parent bbaecbc commit 1171c9d

File tree

2 files changed

+26
-4
lines changed

2 files changed

+26
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.listener.adapter;
1818

19+
20+
import org.springframework.core.KotlinDetector;
1921
import org.springframework.lang.Nullable;
2022
import org.springframework.messaging.Message;
2123
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
@@ -26,6 +28,7 @@
2628
* underlying handler.
2729
*
2830
* @author Gary Russell
31+
* @author Artem Bilan
2932
*
3033
*/
3134
public class HandlerAdapter {
@@ -43,7 +46,10 @@ public class HandlerAdapter {
4346
public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) {
4447
this.invokerHandlerMethod = invokerHandlerMethod;
4548
this.delegatingHandler = null;
46-
this.asyncReplies = AdapterUtils.isAsyncReply(invokerHandlerMethod.getMethod().getReturnType());
49+
Method handlerMethod = invokerHandlerMethod.getMethod();
50+
this.asyncReplies =
51+
AdapterUtils.isAsyncReply(handlerMethod.getReturnType())
52+
|| KotlinDetector.isSuspendingFunction(handlerMethod);
4753
}
4854

4955
/**

spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt

+19-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2024 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.ProducerConfig
2020
import org.apache.kafka.common.serialization.StringDeserializer
2121
import org.apache.kafka.common.serialization.StringSerializer
2222
import org.assertj.core.api.Assertions.assertThat
23+
import org.awaitility.Awaitility.await
2324
import org.junit.jupiter.api.Test
2425
import org.springframework.beans.factory.annotation.Autowired
2526
import org.springframework.beans.factory.annotation.Value
@@ -29,10 +30,16 @@ import org.springframework.kafka.annotation.EnableKafka
2930
import org.springframework.kafka.annotation.KafkaHandler
3031
import org.springframework.kafka.annotation.KafkaListener
3132
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
32-
import org.springframework.kafka.core.*
33+
import org.springframework.kafka.core.ConsumerFactory
34+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
35+
import org.springframework.kafka.core.DefaultKafkaProducerFactory
36+
import org.springframework.kafka.core.KafkaTemplate
37+
import org.springframework.kafka.core.ProducerFactory
3338
import org.springframework.kafka.listener.KafkaListenerErrorHandler
39+
import org.springframework.kafka.support.Acknowledgment
3440
import org.springframework.kafka.test.EmbeddedKafkaBroker
3541
import org.springframework.kafka.test.context.EmbeddedKafka
42+
import org.springframework.kafka.test.utils.KafkaTestUtils
3643
import org.springframework.messaging.handler.annotation.SendTo
3744
import org.springframework.test.annotation.DirtiesContext
3845
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
@@ -45,6 +52,7 @@ import java.util.concurrent.TimeUnit
4552
* Kotlin Annotated async return listener tests.
4653
*
4754
* @author Wang ZhiYang
55+
* @author Artem Bilan
4856
*
4957
* @since 3.1
5058
*/
@@ -65,6 +73,11 @@ class EnableKafkaKotlinCoroutinesTests {
6573
this.template.send("kotlinAsyncTestTopic1", "foo")
6674
assertThat(this.config.latch1.await(10, TimeUnit.SECONDS)).isTrue()
6775
assertThat(this.config.received).isEqualTo("foo")
76+
await()
77+
.untilAsserted {
78+
assertThat(KafkaTestUtils.getPropertyValue(this.config.acknowledgment, "acked"))
79+
.isEqualTo(java.lang.Boolean.TRUE)
80+
}
6881
}
6982

7083
@Test
@@ -114,6 +127,8 @@ class EnableKafkaKotlinCoroutinesTests {
114127
@Volatile
115128
lateinit var received: String
116129

130+
lateinit var acknowledgment: Acknowledgment
131+
117132
@Volatile
118133
lateinit var batchReceived: String
119134

@@ -204,8 +219,9 @@ class EnableKafkaKotlinCoroutinesTests {
204219

205220
@KafkaListener(id = "kotlin", topics = ["kotlinAsyncTestTopic1"],
206221
containerFactory = "kafkaListenerContainerFactory")
207-
suspend fun listen(value: String) {
222+
suspend fun listen(value: String, acknowledgment: Acknowledgment) {
208223
this.received = value
224+
this.acknowledgment = acknowledgment
209225
this.latch1.countDown()
210226
}
211227

0 commit comments

Comments
 (0)