Skip to content

GH-3740: Fix HandlerAdapter to detect Kotlin suspend functions #3742

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

package org.springframework.kafka.listener.adapter;

import java.lang.reflect.Method;

import org.jspecify.annotations.Nullable;

import org.springframework.core.KotlinDetector;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;

Expand All @@ -27,6 +30,7 @@
* underlying handler.
*
* @author Gary Russell
* @author Artem Bilan
*
*/
public class HandlerAdapter {
Expand All @@ -44,7 +48,10 @@ public class HandlerAdapter {
public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) {
this.invokerHandlerMethod = invokerHandlerMethod;
this.delegatingHandler = null;
this.asyncReplies = AdapterUtils.isAsyncReply(invokerHandlerMethod.getMethod().getReturnType());
Method handlerMethod = invokerHandlerMethod.getMethod();
this.asyncReplies =
AdapterUtils.isAsyncReply(handlerMethod.getReturnType())
|| KotlinDetector.isSuspendingFunction(handlerMethod);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2024 the original author or authors.
* Copyright 2016-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.assertj.core.api.Assertions.assertThat
import org.awaitility.Awaitility.await
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
Expand All @@ -29,10 +30,16 @@ import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.annotation.KafkaHandler
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.*
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
import org.springframework.kafka.listener.KafkaListenerErrorHandler
import org.springframework.kafka.support.Acknowledgment
import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.context.EmbeddedKafka
import org.springframework.kafka.test.utils.KafkaTestUtils
import org.springframework.messaging.handler.annotation.SendTo
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
Expand All @@ -45,6 +52,7 @@ import java.util.concurrent.TimeUnit
* Kotlin Annotated async return listener tests.
*
* @author Wang ZhiYang
* @author Artem Bilan
*
* @since 3.1
*/
Expand All @@ -65,6 +73,11 @@ class EnableKafkaKotlinCoroutinesTests {
this.template.send("kotlinAsyncTestTopic1", "foo")
assertThat(this.config.latch1.await(10, TimeUnit.SECONDS)).isTrue()
assertThat(this.config.received).isEqualTo("foo")
await()
.untilAsserted {
assertThat(KafkaTestUtils.getPropertyValue(this.config.acknowledgment, "acked"))
.isEqualTo(java.lang.Boolean.TRUE)
}
}

@Test
Expand Down Expand Up @@ -114,6 +127,8 @@ class EnableKafkaKotlinCoroutinesTests {
@Volatile
lateinit var received: String

lateinit var acknowledgment: Acknowledgment

@Volatile
lateinit var batchReceived: String

Expand Down Expand Up @@ -204,8 +219,9 @@ class EnableKafkaKotlinCoroutinesTests {

@KafkaListener(id = "kotlin", topics = ["kotlinAsyncTestTopic1"],
containerFactory = "kafkaListenerContainerFactory")
suspend fun listen(value: String) {
suspend fun listen(value: String, acknowledgment: Acknowledgment) {
this.received = value
this.acknowledgment = acknowledgment
this.latch1.countDown()
}

Expand Down