Skip to content

Commit c6be29f

Browse files
elizarovqwwdfsad
authored andcommitted
Fix SOE in select with "opposite channels" stress-test
The fix is based on the sequential numbering of atomic select operation. Deadlock is detected and the operation with the lower sequential number is aborted and restarted (with a larger number). Fixes #504
1 parent 3560873 commit c6be29f

File tree

17 files changed

+226
-111
lines changed

17 files changed

+226
-111
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,7 +1049,9 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout
10491049
public fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10501050
public fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
10511051
public fun resumeWith (Ljava/lang/Object;)V
1052-
public fun trySelect (Ljava/lang/Object;)Z
1052+
public fun toString ()Ljava/lang/String;
1053+
public fun trySelect ()Z
1054+
public fun trySelectIdempotent (Ljava/lang/Object;)Ljava/lang/Object;
10531055
}
10541056

10551057
public abstract interface class kotlinx/coroutines/selects/SelectClause0 {
@@ -1071,13 +1073,18 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance {
10711073
public abstract fun performAtomicIfNotSelected (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10721074
public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10731075
public abstract fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
1074-
public abstract fun trySelect (Ljava/lang/Object;)Z
1076+
public abstract fun trySelect ()Z
1077+
public abstract fun trySelectIdempotent (Ljava/lang/Object;)Ljava/lang/Object;
10751078
}
10761079

10771080
public final class kotlinx/coroutines/selects/SelectKt {
10781081
public static final fun select (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10791082
}
10801083

1084+
public synthetic class kotlinx/coroutines/selects/SelectKtSelectOpSequenceNumberRefVolatile {
1085+
public fun <init> (J)V
1086+
}
1087+
10811088
public final class kotlinx/coroutines/selects/SelectUnbiasedKt {
10821089
public static final fun selectUnbiased (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10831090
}

kotlinx-coroutines-core/common/src/JobSupport.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
553553
if (select.isSelected) return
554554
if (state !is Incomplete) {
555555
// already complete -- select result
556-
if (select.trySelect(null)) {
556+
if (select.trySelect()) {
557557
block.startCoroutineUnintercepted(select.completion)
558558
}
559559
return
@@ -1181,7 +1181,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
11811181
if (select.isSelected) return
11821182
if (state !is Incomplete) {
11831183
// already complete -- select result
1184-
if (select.trySelect(null)) {
1184+
if (select.trySelect()) {
11851185
if (state is CompletedExceptionally) {
11861186
select.resumeSelectCancellableWithException(state.cause)
11871187
}
@@ -1362,7 +1362,7 @@ private class SelectJoinOnCompletion<R>(
13621362
private val block: suspend () -> R
13631363
) : JobNode<JobSupport>(job) {
13641364
override fun invoke(cause: Throwable?) {
1365-
if (select.trySelect(null))
1365+
if (select.trySelect())
13661366
block.startCoroutineCancellable(select.completion)
13671367
}
13681368
override fun toString(): String = "SelectJoinOnCompletion[$select]"
@@ -1374,7 +1374,7 @@ private class SelectAwaitOnCompletion<T, R>(
13741374
private val block: suspend (T) -> R
13751375
) : JobNode<JobSupport>(job) {
13761376
override fun invoke(cause: Throwable?) {
1377-
if (select.trySelect(null))
1377+
if (select.trySelect())
13781378
job.selectAwaitCompletion(select, block)
13791379
}
13801380
override fun toString(): String = "SelectAwaitOnCompletion[$select]"

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
6161
*/
6262
protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
6363
// offer atomically with select
64-
val offerOp = describeTryOffer(element, select)
64+
val offerOp = describeTryOffer(element)
6565
val failure = select.performAtomicTrySelect(offerOp)
6666
if (failure != null) return failure
6767
val receive = offerOp.result
@@ -322,29 +322,31 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
322322
/**
323323
* @suppress **This is unstable API and it is subject to change.**
324324
*/
325-
protected fun describeTryOffer(element: E, select: SelectInstance<*>): TryOfferDesc<E> =
326-
TryOfferDesc(element, select, queue)
325+
protected fun describeTryOffer(element: E): TryOfferDesc<E> =
326+
TryOfferDesc(element, queue)
327327

328328
/**
329329
* @suppress **This is unstable API and it is subject to change.**
330330
*/
331331
protected class TryOfferDesc<E>(
332332
@JvmField val element: E,
333-
override val select: SelectInstance<*>,
334333
queue: LockFreeLinkedListHead
335-
) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue), SelectInstanceDesc {
334+
) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue), AtomicSelectDesc {
336335
@JvmField var resumeToken: Any? = null
337336

337+
override var selectOp: Any? = null
338+
338339
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
339340
if (affected !is ReceiveOrClosed<*>) return OFFER_FAILED
340341
if (affected is Closed<*>) return affected
341342
return null
342343
}
343344

344-
override fun validatePrepared(node: ReceiveOrClosed<E>): Boolean {
345-
val token = node.tryResumeReceive(element, idempotent = this) ?: return false
345+
override fun validatePrepared(node: ReceiveOrClosed<E>): Any? {
346+
val token = node.tryResumeReceive(element, idempotent = this) ?: return REMOVE_PREPARED
347+
if (token === SELECT_RETRY) return OFFER_FAILED
346348
resumeToken = token
347-
return true
349+
return null
348350
}
349351
}
350352

@@ -446,8 +448,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
446448
@JvmField val select: SelectInstance<R>,
447449
@JvmField val block: suspend (SendChannel<E>) -> R
448450
) : LockFreeLinkedListNode(), Send, DisposableHandle {
449-
override fun tryResumeSend(idempotent: Any?): Any? =
450-
if (select.trySelect(idempotent)) SELECT_STARTED else null
451+
override fun tryResumeSend(idempotent: Any?): Any? = select.trySelectIdempotent(idempotent)
451452

452453
override fun completeResumeSend(token: Any) {
453454
assert { token === SELECT_STARTED }
@@ -463,7 +464,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
463464
}
464465

465466
override fun resumeSendClosed(closed: Closed<*>) {
466-
if (select.trySelect(null))
467+
if (select.trySelect())
467468
select.resumeSelectCancellableWithException(closed.sendException)
468469
}
469470

@@ -523,7 +524,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
523524
*/
524525
protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
525526
// poll atomically with select
526-
val pollOp = describeTryPoll(select)
527+
val pollOp = describeTryPoll()
527528
val failure = select.performAtomicTrySelect(pollOp)
528529
if (failure != null) return failure
529530
val send = pollOp.result
@@ -652,31 +653,33 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
652653
/**
653654
* @suppress **This is unstable API and it is subject to change.**
654655
*/
655-
protected fun describeTryPoll(select: SelectInstance<*>): TryPollDesc<E> =
656-
TryPollDesc(select, queue)
656+
protected fun describeTryPoll(): TryPollDesc<E> =
657+
TryPollDesc(queue)
657658

658659
/**
659660
* @suppress **This is unstable API and it is subject to change.**
660661
*/
661662
protected class TryPollDesc<E>(
662-
override val select: SelectInstance<*>,
663663
queue: LockFreeLinkedListHead
664-
) : RemoveFirstDesc<Send>(queue), SelectInstanceDesc {
665-
@JvmField var resumeToken: Any? = null
664+
) : RemoveFirstDesc<Send>(queue), AtomicSelectDesc {
665+
@JvmField var resumeToken: Any? = null // can be SELECT_RETRY
666666
@JvmField var pollResult: E? = null
667667

668+
override var selectOp: Any? = null
669+
668670
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
669671
if (affected is Closed<*>) return affected
670672
if (affected !is Send) return POLL_FAILED
671673
return null
672674
}
673675

674676
@Suppress("UNCHECKED_CAST")
675-
override fun validatePrepared(node: Send): Boolean {
676-
val token = node.tryResumeSend(idempotent = this) ?: return false
677+
override fun validatePrepared(node: Send): Any? {
678+
val token = node.tryResumeSend(idempotent = this) ?: return REMOVE_PREPARED
679+
if (token === SELECT_RETRY) return POLL_FAILED
677680
resumeToken = token
678681
pollResult = node.pollResult as E
679-
return true
682+
return null
680683
}
681684
}
682685

@@ -752,7 +755,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
752755
pollResult === POLL_FAILED -> {} // retry
753756
pollResult is Closed<*> -> {
754757
if (pollResult.closeCause == null) {
755-
if (select.trySelect(null))
758+
if (select.trySelect())
756759
block.startCoroutineUnintercepted(null, select.completion)
757760
return
758761
} else {
@@ -971,8 +974,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
971974
@JvmField val block: suspend (Any?) -> R,
972975
@JvmField val receiveMode: Int
973976
) : Receive<E>(), DisposableHandle {
974-
override fun tryResumeReceive(value: E, idempotent: Any?): Any? =
975-
if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null
977+
override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
978+
val result = select.trySelectIdempotent(idempotent)
979+
return if (result === SELECT_STARTED) value ?: NULL_VALUE else result
980+
}
976981

977982
@Suppress("UNCHECKED_CAST")
978983
override fun completeResumeReceive(token: Any) {
@@ -981,7 +986,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
981986
}
982987

983988
override fun resumeReceiveClosed(closed: Closed<*>) {
984-
if (!select.trySelect(null)) return
989+
if (!select.trySelect()) return
985990
when (receiveMode) {
986991
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException)
987992
RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
@@ -1032,10 +1037,6 @@ internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
10321037
@SharedImmutable
10331038
internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
10341039

1035-
@JvmField
1036-
@SharedImmutable
1037-
internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
1038-
10391040
@JvmField
10401041
@SharedImmutable
10411042
internal val NULL_VALUE: Symbol = Symbol("NULL_VALUE")
@@ -1059,6 +1060,7 @@ internal typealias Handler = (Throwable?) -> Unit
10591060
*/
10601061
internal interface Send {
10611062
val pollResult: Any? // E | Closed
1063+
// Returns: null - failure, SELECT_RETRY for retry (only when idempotent != null), otherwise token for completeResumeSend
10621064
fun tryResumeSend(idempotent: Any?): Any?
10631065
fun completeResumeSend(token: Any)
10641066
fun resumeSendClosed(closed: Closed<*>)
@@ -1069,6 +1071,7 @@ internal interface Send {
10691071
*/
10701072
internal interface ReceiveOrClosed<in E> {
10711073
val offerResult: Any // OFFER_SUCCESS | Closed
1074+
// Returns: null - failure, SELECT_RETRY for retry (only when idempotent != null), otherwise token for completeResumeReceive
10721075
fun tryResumeReceive(value: E, idempotent: Any?): Any?
10731076
fun completeResumeReceive(token: Any)
10741077
}

kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ internal class ArrayBroadcastChannel<E>(
105105
val size = this.size
106106
if (size >= capacity) return OFFER_FAILED
107107
// let's try to select sending this element to buffer
108-
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
108+
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
109109
return ALREADY_SELECTED
110110
}
111111
val tail = this.tail
@@ -299,7 +299,7 @@ internal class ArrayBroadcastChannel<E>(
299299
result === POLL_FAILED -> { /* just bail out of lock */ }
300300
else -> {
301301
// let's try to select receiving this element from buffer
302-
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
302+
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
303303
result = ALREADY_SELECTED
304304
} else {
305305
// update subHead after retrieiving element from buffer

kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ internal open class ArrayChannel<E>(
9494
// check for receivers that were waiting on empty queue
9595
if (size == 0) {
9696
loop@ while (true) {
97-
val offerOp = describeTryOffer(element, select)
97+
val offerOp = describeTryOffer(element)
9898
val failure = select.performAtomicTrySelect(offerOp)
9999
when {
100100
failure == null -> { // offered successfully
@@ -114,7 +114,7 @@ internal open class ArrayChannel<E>(
114114
}
115115
}
116116
// let's try to select sending this element to buffer
117-
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
117+
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
118118
this.size = size // restore size
119119
return ALREADY_SELECTED
120120
}
@@ -195,7 +195,7 @@ internal open class ArrayChannel<E>(
195195
var replacement: Any? = POLL_FAILED
196196
if (size == capacity) {
197197
loop@ while (true) {
198-
val pollOp = describeTryPoll(select)
198+
val pollOp = describeTryPoll()
199199
val failure = select.performAtomicTrySelect(pollOp)
200200
when {
201201
failure == null -> { // polled successfully
@@ -226,7 +226,7 @@ internal open class ArrayChannel<E>(
226226
buffer[(head + size) % buffer.size] = replacement
227227
} else {
228228
// failed to poll or is already closed --> let's try to select receiving this element from buffer
229-
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
229+
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
230230
this.size = size // restore size
231231
buffer[head] = result // restore head
232232
return ALREADY_SELECTED

kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
273273
}
274274

275275
private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
276-
if (!select.trySelect(null)) return
276+
if (!select.trySelect()) return
277277
offerInternal(element)?.let {
278278
select.resumeSelectCancellableWithException(it.sendException)
279279
return

kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
package kotlinx.coroutines.internal
66

7+
import kotlin.jvm.*
8+
79
/** @suppress **This is unstable API and it is subject to change.** */
810
public expect open class LockFreeLinkedListNode() {
911
public val isRemoved: Boolean
@@ -57,7 +59,7 @@ public expect open class AddLastDesc<T : LockFreeLinkedListNode>(
5759
public expect open class RemoveFirstDesc<T>(queue: LockFreeLinkedListNode): AbstractAtomicDesc {
5860
val queue: LockFreeLinkedListNode
5961
public val result: T
60-
protected open fun validatePrepared(node: T): Boolean
62+
protected open fun validatePrepared(node: T): Any?
6163
protected final override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
6264
final override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
6365
}
@@ -71,3 +73,7 @@ public expect abstract class AbstractAtomicDesc : AtomicDesc {
7173
protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? // non-null on failure
7274
protected abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
7375
}
76+
77+
@JvmField
78+
@SharedImmutable
79+
internal val REMOVE_PREPARED: Any = Symbol("REMOVE_PREPARED")

0 commit comments

Comments
 (0)