Skip to content

Commit b0db30c

Browse files
authored
GH-3740: Fix HandlerAdapter to detect Kotlin suspend functions (#3742)
Fixes: #3740 Issue link: #3740 Even if Kotlin `suspend` functions are called properly, the acknowledgement is not called because this kind of method is not treated as an `asyncReplies` mode * Fix `HandlerAdapter` to check for `KotlinDetector.isSuspendingFunction()` in addition to `CompletableFuture` & `Mono` * Adjust `EnableKafkaKotlinCoroutinesTests.kt` to verify that `acknowledgement` has been called by the Framework **Auto-cherry-pick to `3.3.x` & `3.2.x`**
1 parent a34508e commit b0db30c

File tree

2 files changed

+27
-4
lines changed

2 files changed

+27
-4
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616

1717
package org.springframework.kafka.listener.adapter;
1818

19+
import java.lang.reflect.Method;
20+
1921
import org.jspecify.annotations.Nullable;
2022

23+
import org.springframework.core.KotlinDetector;
2124
import org.springframework.messaging.Message;
2225
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
2326

@@ -27,6 +30,7 @@
2730
* underlying handler.
2831
*
2932
* @author Gary Russell
33+
* @author Artem Bilan
3034
*
3135
*/
3236
public class HandlerAdapter {
@@ -44,7 +48,10 @@ public class HandlerAdapter {
4448
public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) {
4549
this.invokerHandlerMethod = invokerHandlerMethod;
4650
this.delegatingHandler = null;
47-
this.asyncReplies = AdapterUtils.isAsyncReply(invokerHandlerMethod.getMethod().getReturnType());
51+
Method handlerMethod = invokerHandlerMethod.getMethod();
52+
this.asyncReplies =
53+
AdapterUtils.isAsyncReply(handlerMethod.getReturnType())
54+
|| KotlinDetector.isSuspendingFunction(handlerMethod);
4855
}
4956

5057
/**

Diff for: spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt

+19-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2024 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.ProducerConfig
2020
import org.apache.kafka.common.serialization.StringDeserializer
2121
import org.apache.kafka.common.serialization.StringSerializer
2222
import org.assertj.core.api.Assertions.assertThat
23+
import org.awaitility.Awaitility.await
2324
import org.junit.jupiter.api.Test
2425
import org.springframework.beans.factory.annotation.Autowired
2526
import org.springframework.beans.factory.annotation.Value
@@ -29,10 +30,16 @@ import org.springframework.kafka.annotation.EnableKafka
2930
import org.springframework.kafka.annotation.KafkaHandler
3031
import org.springframework.kafka.annotation.KafkaListener
3132
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
32-
import org.springframework.kafka.core.*
33+
import org.springframework.kafka.core.ConsumerFactory
34+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
35+
import org.springframework.kafka.core.DefaultKafkaProducerFactory
36+
import org.springframework.kafka.core.KafkaTemplate
37+
import org.springframework.kafka.core.ProducerFactory
3338
import org.springframework.kafka.listener.KafkaListenerErrorHandler
39+
import org.springframework.kafka.support.Acknowledgment
3440
import org.springframework.kafka.test.EmbeddedKafkaBroker
3541
import org.springframework.kafka.test.context.EmbeddedKafka
42+
import org.springframework.kafka.test.utils.KafkaTestUtils
3643
import org.springframework.messaging.handler.annotation.SendTo
3744
import org.springframework.test.annotation.DirtiesContext
3845
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
@@ -45,6 +52,7 @@ import java.util.concurrent.TimeUnit
4552
* Kotlin Annotated async return listener tests.
4653
*
4754
* @author Wang ZhiYang
55+
* @author Artem Bilan
4856
*
4957
* @since 3.1
5058
*/
@@ -65,6 +73,11 @@ class EnableKafkaKotlinCoroutinesTests {
6573
this.template.send("kotlinAsyncTestTopic1", "foo")
6674
assertThat(this.config.latch1.await(10, TimeUnit.SECONDS)).isTrue()
6775
assertThat(this.config.received).isEqualTo("foo")
76+
await()
77+
.untilAsserted {
78+
assertThat(KafkaTestUtils.getPropertyValue(this.config.acknowledgment, "acked"))
79+
.isEqualTo(java.lang.Boolean.TRUE)
80+
}
6881
}
6982

7083
@Test
@@ -114,6 +127,8 @@ class EnableKafkaKotlinCoroutinesTests {
114127
@Volatile
115128
lateinit var received: String
116129

130+
lateinit var acknowledgment: Acknowledgment
131+
117132
@Volatile
118133
lateinit var batchReceived: String
119134

@@ -204,8 +219,9 @@ class EnableKafkaKotlinCoroutinesTests {
204219

205220
@KafkaListener(id = "kotlin", topics = ["kotlinAsyncTestTopic1"],
206221
containerFactory = "kafkaListenerContainerFactory")
207-
suspend fun listen(value: String) {
222+
suspend fun listen(value: String, acknowledgment: Acknowledgment) {
208223
this.received = value
224+
this.acknowledgment = acknowledgment
209225
this.latch1.countDown()
210226
}
211227

0 commit comments

Comments
 (0)