Skip to content

Fix SOE with select onSend/onReceive clauses on the same channel #1417

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,9 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout
public fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
public fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
public fun resumeWith (Ljava/lang/Object;)V
public fun trySelect (Ljava/lang/Object;)Z
public fun toString ()Ljava/lang/String;
public fun trySelect ()Z
public fun trySelectIdempotent (Ljava/lang/Object;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/selects/SelectClause0 {
Expand All @@ -1047,13 +1049,18 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance {
public abstract fun performAtomicIfNotSelected (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
public abstract fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
public abstract fun trySelect (Ljava/lang/Object;)Z
public abstract fun trySelect ()Z
public abstract fun trySelectIdempotent (Ljava/lang/Object;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/selects/SelectKt {
public static final fun select (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public synthetic class kotlinx/coroutines/selects/SelectKtSelectOpSequenceNumberRefVolatile {
public fun <init> (J)V
}

public final class kotlinx/coroutines/selects/SelectUnbiasedKt {
public static final fun selectUnbiased (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}
Expand Down
8 changes: 4 additions & 4 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
if (select.isSelected) return
if (state !is Incomplete) {
// already complete -- select result
if (select.trySelect(null)) {
if (select.trySelect()) {
block.startCoroutineUnintercepted(select.completion)
}
return
Expand Down Expand Up @@ -1181,7 +1181,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
if (select.isSelected) return
if (state !is Incomplete) {
// already complete -- select result
if (select.trySelect(null)) {
if (select.trySelect()) {
if (state is CompletedExceptionally) {
select.resumeSelectCancellableWithException(state.cause)
}
Expand Down Expand Up @@ -1362,7 +1362,7 @@ private class SelectJoinOnCompletion<R>(
private val block: suspend () -> R
) : JobNode<JobSupport>(job) {
override fun invoke(cause: Throwable?) {
if (select.trySelect(null))
if (select.trySelect())
block.startCoroutineCancellable(select.completion)
}
override fun toString(): String = "SelectJoinOnCompletion[$select]"
Expand All @@ -1374,7 +1374,7 @@ private class SelectAwaitOnCompletion<T, R>(
private val block: suspend (T) -> R
) : JobNode<JobSupport>(job) {
override fun invoke(cause: Throwable?) {
if (select.trySelect(null))
if (select.trySelect())
job.selectAwaitCompletion(select, block)
}
override fun toString(): String = "SelectAwaitOnCompletion[$select]"
Expand Down
53 changes: 31 additions & 22 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -322,27 +322,31 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
/**
* @suppress **This is unstable API and it is subject to change.**
*/
protected fun describeTryOffer(element: E): TryOfferDesc<E> = TryOfferDesc(element, queue)
protected fun describeTryOffer(element: E): TryOfferDesc<E> =
TryOfferDesc(element, queue)

/**
* @suppress **This is unstable API and it is subject to change.**
*/
protected class TryOfferDesc<E>(
@JvmField val element: E,
queue: LockFreeLinkedListHead
) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue) {
) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue), AtomicSelectDesc {
@JvmField var resumeToken: Any? = null

override var selectOp: Any? = null

override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
if (affected !is ReceiveOrClosed<*>) return OFFER_FAILED
if (affected is Closed<*>) return affected
return null
}

override fun validatePrepared(node: ReceiveOrClosed<E>): Boolean {
val token = node.tryResumeReceive(element, idempotent = this) ?: return false
override fun validatePrepared(node: ReceiveOrClosed<E>): Any? {
val token = node.tryResumeReceive(element, idempotent = this) ?: return REMOVE_PREPARED
if (token === SELECT_RETRY) return OFFER_FAILED
resumeToken = token
return true
return null
}
}

Expand Down Expand Up @@ -444,8 +448,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
@JvmField val select: SelectInstance<R>,
@JvmField val block: suspend (SendChannel<E>) -> R
) : LockFreeLinkedListNode(), Send, DisposableHandle {
override fun tryResumeSend(idempotent: Any?): Any? =
if (select.trySelect(idempotent)) SELECT_STARTED else null
override fun tryResumeSend(idempotent: Any?): Any? = select.trySelectIdempotent(idempotent)

override fun completeResumeSend(token: Any) {
assert { token === SELECT_STARTED }
Expand All @@ -461,7 +464,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
}

override fun resumeSendClosed(closed: Closed<*>) {
if (select.trySelect(null))
if (select.trySelect())
select.resumeSelectCancellableWithException(closed.sendException)
}

Expand Down Expand Up @@ -650,27 +653,33 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
/**
* @suppress **This is unstable API and it is subject to change.**
*/
protected fun describeTryPoll(): TryPollDesc<E> = TryPollDesc(queue)
protected fun describeTryPoll(): TryPollDesc<E> =
TryPollDesc(queue)

/**
* @suppress **This is unstable API and it is subject to change.**
*/
protected class TryPollDesc<E>(queue: LockFreeLinkedListHead) : RemoveFirstDesc<Send>(queue) {
@JvmField var resumeToken: Any? = null
protected class TryPollDesc<E>(
queue: LockFreeLinkedListHead
) : RemoveFirstDesc<Send>(queue), AtomicSelectDesc {
@JvmField var resumeToken: Any? = null // can be SELECT_RETRY
@JvmField var pollResult: E? = null

override var selectOp: Any? = null

override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
if (affected is Closed<*>) return affected
if (affected !is Send) return POLL_FAILED
return null
}

@Suppress("UNCHECKED_CAST")
override fun validatePrepared(node: Send): Boolean {
val token = node.tryResumeSend(idempotent = this) ?: return false
override fun validatePrepared(node: Send): Any? {
val token = node.tryResumeSend(idempotent = this) ?: return REMOVE_PREPARED
if (token === SELECT_RETRY) return POLL_FAILED
resumeToken = token
pollResult = node.pollResult as E
return true
return null
}
}

Expand Down Expand Up @@ -746,7 +755,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
pollResult === POLL_FAILED -> {} // retry
pollResult is Closed<*> -> {
if (pollResult.closeCause == null) {
if (select.trySelect(null))
if (select.trySelect())
block.startCoroutineUnintercepted(null, select.completion)
return
} else {
Expand Down Expand Up @@ -965,8 +974,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
@JvmField val block: suspend (Any?) -> R,
@JvmField val receiveMode: Int
) : Receive<E>(), DisposableHandle {
override fun tryResumeReceive(value: E, idempotent: Any?): Any? =
if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null
override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
val result = select.trySelectIdempotent(idempotent)
return if (result === SELECT_STARTED) value ?: NULL_VALUE else result
}

@Suppress("UNCHECKED_CAST")
override fun completeResumeReceive(token: Any) {
Expand All @@ -975,7 +986,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}

override fun resumeReceiveClosed(closed: Closed<*>) {
if (!select.trySelect(null)) return
if (!select.trySelect()) return
when (receiveMode) {
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException)
RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
Expand Down Expand Up @@ -1026,10 +1037,6 @@ internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
@SharedImmutable
internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")

@JvmField
@SharedImmutable
internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED")

@JvmField
@SharedImmutable
internal val NULL_VALUE: Symbol = Symbol("NULL_VALUE")
Expand All @@ -1053,6 +1060,7 @@ internal typealias Handler = (Throwable?) -> Unit
*/
internal interface Send {
val pollResult: Any? // E | Closed
// Returns: null - failure, SELECT_RETRY for retry (only when idempotent != null), otherwise token for completeResumeSend
fun tryResumeSend(idempotent: Any?): Any?
fun completeResumeSend(token: Any)
fun resumeSendClosed(closed: Closed<*>)
Expand All @@ -1063,6 +1071,7 @@ internal interface Send {
*/
internal interface ReceiveOrClosed<in E> {
val offerResult: Any // OFFER_SUCCESS | Closed
// Returns: null - failure, SELECT_RETRY for retry (only when idempotent != null), otherwise token for completeResumeReceive
fun tryResumeReceive(value: E, idempotent: Any?): Any?
fun completeResumeReceive(token: Any)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ internal class ArrayBroadcastChannel<E>(
val size = this.size
if (size >= capacity) return OFFER_FAILED
// let's try to select sending this element to buffer
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
return ALREADY_SELECTED
}
val tail = this.tail
Expand Down Expand Up @@ -299,7 +299,7 @@ internal class ArrayBroadcastChannel<E>(
result === POLL_FAILED -> { /* just bail out of lock */ }
else -> {
// let's try to select receiving this element from buffer
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
result = ALREADY_SELECTED
} else {
// update subHead after retrieiving element from buffer
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ internal open class ArrayChannel<E>(
}
}
// let's try to select sending this element to buffer
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
this.size = size // restore size
return ALREADY_SELECTED
}
Expand Down Expand Up @@ -206,7 +206,7 @@ internal open class ArrayChannel<E>(
buffer[(head + size) % capacity] = replacement
} else {
// failed to poll or is already closed --> let's try to select receiving this element from buffer
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
this.size = size // restore size
buffer[head] = result // restore head
return ALREADY_SELECTED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
}

private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
if (!select.trySelect(null)) return
if (!select.trySelect()) return
offerInternal(element)?.let {
select.resumeSelectCancellableWithException(it.sendException)
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package kotlinx.coroutines.internal

import kotlin.jvm.*

/** @suppress **This is unstable API and it is subject to change.** */
public expect open class LockFreeLinkedListNode() {
public val isRemoved: Boolean
Expand Down Expand Up @@ -57,7 +59,7 @@ public expect open class AddLastDesc<T : LockFreeLinkedListNode>(
public expect open class RemoveFirstDesc<T>(queue: LockFreeLinkedListNode): AbstractAtomicDesc {
val queue: LockFreeLinkedListNode
public val result: T
protected open fun validatePrepared(node: T): Boolean
protected open fun validatePrepared(node: T): Any?
protected final override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
final override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
}
Expand All @@ -71,3 +73,7 @@ public expect abstract class AbstractAtomicDesc : AtomicDesc {
protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? // non-null on failure
protected abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
}

@JvmField
@SharedImmutable
internal val REMOVE_PREPARED: Any = Symbol("REMOVE_PREPARED")
Loading