From 1ed4640c904843caabf9b27dde39aa099f766b68 Mon Sep 17 00:00:00 2001 From: Steve Fan <29133953+stevefan1999-personal@users.noreply.github.com> Date: Fri, 2 Oct 2020 13:44:40 +0800 Subject: [PATCH] workaround for receiveOrClosed is an abstract method issue --- .../common/src/flow/Channels.kt | 46 ++++++++++++++----- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/Channels.kt b/kotlinx-coroutines-core/common/src/flow/Channels.kt index 2d3ef95aa1..49a8e1bb64 100644 --- a/kotlinx-coroutines-core/common/src/flow/Channels.kt +++ b/kotlinx-coroutines-core/common/src/flow/Channels.kt @@ -33,6 +33,11 @@ private suspend fun FlowCollector.emitAllImpl(channel: ReceiveChannel, // See https://youtrack.jetbrains.com/issue/KT-16222 // See https://github.com/Kotlin/kotlinx.coroutines/issues/1333 var cause: Throwable? = null + + // Fixes a bug where receiveOrClosed is an abstract method, + // this is a problem in the implementation of vertx-lang-kotlin for toChannel + // See https://github.com/vert-x3/vertx-lang-kotlin/blob/4bdf42202ff55ab0d4db079f90ad0f93a30a7b33/vertx-lang-kotlin-coroutines/src/main/java/io/vertx/kotlin/coroutines/ReceiveChannelHandler.kt#L142-L161 + var hasReceiveOrClosed = true try { while (true) { // :KLUDGE: This "run" call is resolved to an extension function "run" and forces the size of @@ -44,18 +49,37 @@ private suspend fun FlowCollector.emitAllImpl(channel: ReceiveChannel, // L$1 <- channel // L$2 <- cause // L$3 <- this$run (actually equal to this) - val result = run { channel.receiveOrClosed() } - if (result.isClosed) { - result.closeCause?.let { throw it } - break // returns normally when result.closeCause == null + if (hasReceiveOrClosed) { + val result = run { channel.receiveOrClosed() }.onFailure { + if (it is AbstractMethodError) { + hasReceiveOrClosed = false + } else { + throw it + } + } + // We don't have receiveOrClosed + if (!hasReceiveOrClosed) continue + + if (result.isClosed) { + result.closeCause?.let { throw it } + break // returns normally when result.closeCause == null + } + // result is spilled here to the coroutine state and retained after the call, even though + // it is not actually needed in the next loop iteration. + // L$0 <- this + // L$1 <- channel + // L$2 <- cause + // L$3 <- result + emit(result.value) + } else { + if (!channel.isClosedForReceive) { + val result = run { channel.receive() } + // Result is also seemingly spilled here as explained above + emit(result) + } else { + break + } } - // result is spilled here to the coroutine state and retained after the call, even though - // it is not actually needed in the next loop iteration. - // L$0 <- this - // L$1 <- channel - // L$2 <- cause - // L$3 <- result - emit(result.value) } } catch (e: Throwable) { cause = e