From 0911ad2b72e76170a2b704ff201fb5d761aa6476 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 13 Feb 2025 14:28:16 -0500 Subject: [PATCH] GH-3740: Fix `HandlerAdapter` to detect Kotlin suspend functions Fixes: #3740 Issue link: https://github.com/spring-projects/spring-kafka/issues/3740 Even if Kotlin `suspend` functions are called properly, the acknowledgement is not called because this kind of method is not treated as an `asyncReplies` mode * Fix `HandlerAdapter` to check for `KotlinDetector.isSuspendingFunction()` in addition to `CompletableFuture` & `Mono` * Adjust `EnableKafkaKotlinCoroutinesTests.kt` to verify that `acknowledgement` has been called by the Framework **Auto-cherry-pick to `3.3.x` & `3.2.x`** --- .../listener/adapter/HandlerAdapter.java | 9 +++++++- .../EnableKafkaKotlinCoroutinesTests.kt | 22 ++++++++++++++++--- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java index 3d244ac76d..1e5a9fff57 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java @@ -16,8 +16,11 @@ package org.springframework.kafka.listener.adapter; +import java.lang.reflect.Method; + import org.jspecify.annotations.Nullable; +import org.springframework.core.KotlinDetector; import org.springframework.messaging.Message; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; @@ -27,6 +30,7 @@ * underlying handler. * * @author Gary Russell + * @author Artem Bilan * */ public class HandlerAdapter { @@ -44,7 +48,10 @@ public class HandlerAdapter { public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) { this.invokerHandlerMethod = invokerHandlerMethod; this.delegatingHandler = null; - this.asyncReplies = AdapterUtils.isAsyncReply(invokerHandlerMethod.getMethod().getReturnType()); + Method handlerMethod = invokerHandlerMethod.getMethod(); + this.asyncReplies = + AdapterUtils.isAsyncReply(handlerMethod.getReturnType()) + || KotlinDetector.isSuspendingFunction(handlerMethod); } /** diff --git a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt index ec60d2bd02..9cdc22f378 100644 --- a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt +++ b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2024 the original author or authors. + * Copyright 2016-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringSerializer import org.assertj.core.api.Assertions.assertThat +import org.awaitility.Awaitility.await import org.junit.jupiter.api.Test import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Value @@ -29,10 +30,16 @@ import org.springframework.kafka.annotation.EnableKafka import org.springframework.kafka.annotation.KafkaHandler import org.springframework.kafka.annotation.KafkaListener import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory -import org.springframework.kafka.core.* +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory import org.springframework.kafka.listener.KafkaListenerErrorHandler +import org.springframework.kafka.support.Acknowledgment import org.springframework.kafka.test.EmbeddedKafkaBroker import org.springframework.kafka.test.context.EmbeddedKafka +import org.springframework.kafka.test.utils.KafkaTestUtils import org.springframework.messaging.handler.annotation.SendTo import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.junit.jupiter.SpringJUnitConfig @@ -45,6 +52,7 @@ import java.util.concurrent.TimeUnit * Kotlin Annotated async return listener tests. * * @author Wang ZhiYang + * @author Artem Bilan * * @since 3.1 */ @@ -65,6 +73,11 @@ class EnableKafkaKotlinCoroutinesTests { this.template.send("kotlinAsyncTestTopic1", "foo") assertThat(this.config.latch1.await(10, TimeUnit.SECONDS)).isTrue() assertThat(this.config.received).isEqualTo("foo") + await() + .untilAsserted { + assertThat(KafkaTestUtils.getPropertyValue(this.config.acknowledgment, "acked")) + .isEqualTo(java.lang.Boolean.TRUE) + } } @Test @@ -114,6 +127,8 @@ class EnableKafkaKotlinCoroutinesTests { @Volatile lateinit var received: String + lateinit var acknowledgment: Acknowledgment + @Volatile lateinit var batchReceived: String @@ -204,8 +219,9 @@ class EnableKafkaKotlinCoroutinesTests { @KafkaListener(id = "kotlin", topics = ["kotlinAsyncTestTopic1"], containerFactory = "kafkaListenerContainerFactory") - suspend fun listen(value: String) { + suspend fun listen(value: String, acknowledgment: Acknowledgment) { this.received = value + this.acknowledgment = acknowledgment this.latch1.countDown() }