@@ -56,7 +56,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
56
56
57
57
/* *
58
58
* Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
59
- * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
59
+ * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | RETRY_ATOMIC | Closed`.
60
60
* @suppress **This is unstable API and it is subject to change.**
61
61
*/
62
62
protected open fun offerSelectInternal (element : E , select : SelectInstance <* >): Any {
@@ -345,10 +345,11 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
345
345
else -> null
346
346
}
347
347
348
- override fun validatePrepared (node : ReceiveOrClosed <E >): Boolean {
349
- val token = node.tryResumeReceive(element, idempotent = this ) ? : return false
348
+ override fun validatePrepared (node : ReceiveOrClosed <E >): Any? {
349
+ val token = node.tryResumeReceive(element, idempotent = this ) ? : return REMOVE_PREPARED
350
+ if (token == = RETRY_ATOMIC ) return RETRY_ATOMIC
350
351
resumeToken = token
351
- return true
352
+ return null
352
353
}
353
354
}
354
355
@@ -384,6 +385,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
384
385
when {
385
386
offerResult == = ALREADY_SELECTED -> return
386
387
offerResult == = OFFER_FAILED -> {} // retry
388
+ offerResult == = RETRY_ATOMIC -> {} // retry
387
389
offerResult == = OFFER_SUCCESS -> {
388
390
block.startCoroutineUnintercepted(receiver = this , completion = select.completion)
389
391
return
@@ -437,7 +439,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
437
439
@JvmField val block : suspend (SendChannel <E >) -> R
438
440
) : Send(), DisposableHandle {
439
441
override fun tryResumeSend (idempotent : Any? ): Any? =
440
- if ( select.trySelect (idempotent)) SELECT_STARTED else null
442
+ select.trySelectIdempotent (idempotent)
441
443
442
444
override fun completeResumeSend (token : Any ) {
443
445
assert { token == = SELECT_STARTED }
@@ -449,7 +451,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
449
451
}
450
452
451
453
override fun resumeSendClosed (closed : Closed <* >) {
452
- if (select.trySelect(null ))
454
+ if (select.trySelect())
453
455
select.resumeSelectCancellableWithException(closed.sendException)
454
456
}
455
457
@@ -504,7 +506,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
504
506
505
507
/* *
506
508
* Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
507
- * Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
509
+ * Return type is `ALREADY_SELECTED | E | POLL_FAILED | RETRY_ATOMIC | Closed`
508
510
* @suppress **This is unstable API and it is subject to change.**
509
511
*/
510
512
protected open fun pollSelectInternal (select : SelectInstance <* >): Any? {
@@ -654,11 +656,12 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
654
656
}
655
657
656
658
@Suppress(" UNCHECKED_CAST" )
657
- override fun validatePrepared (node : Send ): Boolean {
658
- val token = node.tryResumeSend(idempotent = this ) ? : return false
659
+ override fun validatePrepared (node : Send ): Any? {
660
+ val token = node.tryResumeSend(idempotent = this ) ? : return REMOVE_PREPARED
661
+ if (token == = RETRY_ATOMIC ) return RETRY_ATOMIC
659
662
resumeToken = token
660
663
pollResult = node.pollResult as E
661
- return true
664
+ return null
662
665
}
663
666
}
664
667
@@ -680,6 +683,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
680
683
when {
681
684
pollResult == = ALREADY_SELECTED -> return
682
685
pollResult == = POLL_FAILED -> {} // retry
686
+ pollResult == = RETRY_ATOMIC -> {}
683
687
pollResult is Closed <* > -> throw recoverStackTrace(pollResult.receiveException)
684
688
else -> {
685
689
block.startCoroutineUnintercepted(pollResult as E , select.completion)
@@ -708,9 +712,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
708
712
when {
709
713
pollResult == = ALREADY_SELECTED -> return
710
714
pollResult == = POLL_FAILED -> {} // retry
715
+ pollResult == = RETRY_ATOMIC -> {} // retry
711
716
pollResult is Closed <* > -> {
712
717
if (pollResult.closeCause == null ) {
713
- if (select.trySelect(null ))
718
+ if (select.trySelect())
714
719
block.startCoroutineUnintercepted(null , select.completion)
715
720
return
716
721
} else {
@@ -745,6 +750,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
745
750
when {
746
751
pollResult == = ALREADY_SELECTED -> return
747
752
pollResult == = POLL_FAILED -> {} // retry
753
+ pollResult == = RETRY_ATOMIC -> {} // retry
748
754
pollResult is Closed <* > -> {
749
755
block.startCoroutineUnintercepted(ValueOrClosed .closed(pollResult.closeCause), select.completion)
750
756
}
@@ -927,8 +933,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
927
933
@JvmField val block : suspend (Any? ) -> R ,
928
934
@JvmField val receiveMode : Int
929
935
) : Receive<E>(), DisposableHandle {
930
- override fun tryResumeReceive (value : E , idempotent : Any? ): Any? =
931
- if (select.trySelect(idempotent)) (value ? : NULL_VALUE ) else null
936
+ override fun tryResumeReceive (value : E , idempotent : Any? ): Any? {
937
+ val result = select.trySelectIdempotent(idempotent)
938
+ return if (result == = SELECT_STARTED ) value ? : NULL_VALUE else result
939
+ }
932
940
933
941
@Suppress(" UNCHECKED_CAST" )
934
942
override fun completeResumeReceive (token : Any ) {
@@ -937,7 +945,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
937
945
}
938
946
939
947
override fun resumeReceiveClosed (closed : Closed <* >) {
940
- if (! select.trySelect(null )) return
948
+ if (! select.trySelect()) return
941
949
when (receiveMode) {
942
950
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException)
943
951
RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed .closed<R >(closed.closeCause), select.completion)
@@ -984,10 +992,6 @@ internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
984
992
@SharedImmutable
985
993
internal val ENQUEUE_FAILED : Any = Symbol (" ENQUEUE_FAILED" )
986
994
987
- @JvmField
988
- @SharedImmutable
989
- internal val SELECT_STARTED : Any = Symbol (" SELECT_STARTED" )
990
-
991
995
@JvmField
992
996
@SharedImmutable
993
997
internal val NULL_VALUE : Symbol = Symbol (" NULL_VALUE" )
@@ -1011,6 +1015,7 @@ internal typealias Handler = (Throwable?) -> Unit
1011
1015
*/
1012
1016
internal abstract class Send : LockFreeLinkedListNode () {
1013
1017
abstract val pollResult: Any? // E | Closed
1018
+ // Returns: null - failure, RETRY_ATOMIC for retry (only when idempotent != null), otherwise token for completeResumeSend
1014
1019
abstract fun tryResumeSend (idempotent : Any? ): Any?
1015
1020
abstract fun completeResumeSend (token : Any )
1016
1021
abstract fun resumeSendClosed (closed : Closed <* >)
@@ -1021,6 +1026,7 @@ internal abstract class Send : LockFreeLinkedListNode() {
1021
1026
*/
1022
1027
internal interface ReceiveOrClosed <in E > {
1023
1028
val offerResult: Any // OFFER_SUCCESS | Closed
1029
+ // Returns: null - failure, RETRY_ATOMIC for retry (only when idempotent != null), otherwise token for completeResumeReceive
1024
1030
fun tryResumeReceive (value : E , idempotent : Any? ): Any?
1025
1031
fun completeResumeReceive (token : Any )
1026
1032
}
0 commit comments