Skip to content

Commit 4738902

Browse files
committed
~merge SafeCollector and ContinuationImpl, properly release intercepted continuation
1 parent 29eb721 commit 4738902

File tree

7 files changed

+48
-42
lines changed

7 files changed

+48
-42
lines changed

kotlinx-coroutines-core/common/src/flow/Builders.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit
5555
try {
5656
safeCollector.block()
5757
} finally {
58-
safeCollector.release()
58+
safeCollector.releaseIntercepted()
5959
}
6060
}
6161
}

kotlinx-coroutines-core/common/src/flow/Flow.kt

+9-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
package kotlinx.coroutines.flow
66

77
import kotlinx.coroutines.*
8-
import kotlinx.coroutines.flow.internal.SafeCollector
8+
import kotlinx.coroutines.flow.internal.*
99
import kotlin.coroutines.*
1010

1111
/**
@@ -149,8 +149,8 @@ import kotlin.coroutines.*
149149
* it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught"
150150
* by an upstream flow, limiting the ability of local reasoning about the code.
151151
*
152-
* Flow machinery enforces exception transparency at runtime an throws [IllegalStateException] on any attempt to emit value,
153-
* if an exception has been thrown on previous attemp.
152+
* Flow machinery enforces exception transparency at runtime and throws [IllegalStateException] on any attempt to emit a value,
153+
* if an exception has been thrown on previous attempt.
154154
*
155155
* ### Reactive streams
156156
*
@@ -199,7 +199,12 @@ public abstract class AbstractFlow<T> : Flow<T> {
199199

200200
@InternalCoroutinesApi
201201
public final override suspend fun collect(collector: FlowCollector<T>) {
202-
collectSafely(SafeCollector(collector, coroutineContext))
202+
val safeCollector = SafeCollector(collector, coroutineContext)
203+
try {
204+
collectSafely(safeCollector)
205+
} finally {
206+
safeCollector.releaseIntercepted()
207+
}
203208
}
204209

205210
/**

kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.common.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ internal expect class SafeCollector<T>(
1717
internal val collector: FlowCollector<T>
1818
internal val collectContext: CoroutineContext
1919
internal val collectContextSize: Int
20-
public fun release()
20+
public fun releaseIntercepted()
2121
}
2222

2323
@JvmName("checkContext") // For prettier stack traces

kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt

+12-2
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,12 @@ internal inline fun <T, R> Flow<T>.unsafeTransform(
7171
public fun <T> Flow<T>.onStart(
7272
action: suspend FlowCollector<T>.() -> Unit
7373
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
74-
SafeCollector<T>(this, coroutineContext).action()
74+
val safeCollector = SafeCollector<T>(this, coroutineContext)
75+
try {
76+
safeCollector.action()
77+
} finally {
78+
safeCollector.releaseIntercepted()
79+
}
7580
collect(this) // directly delegate
7681
}
7782

@@ -141,7 +146,12 @@ public fun <T> Flow<T>.onCompletion(
141146
throw e
142147
}
143148
// Exception from the upstream or normal completion
144-
SafeCollector(this, coroutineContext).invokeSafely(action, exception)
149+
val safeCollector = SafeCollector(this, coroutineContext)
150+
try {
151+
safeCollector.invokeSafely(action, exception)
152+
} finally {
153+
safeCollector.releaseIntercepted()
154+
}
145155
exception?.let { throw it }
146156
}
147157

kotlinx-coroutines-core/js/src/flow/internal/SafeCollector.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ internal actual class SafeCollector<T> actual constructor(
2525
collector.emit(value)
2626
}
2727

28-
public actual fun release() {
28+
public actual fun releaseIntercepted() {
2929
}
3030
}

kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt

+23-32
Original file line numberDiff line numberDiff line change
@@ -10,41 +10,36 @@ import kotlin.coroutines.intrinsics.*
1010
import kotlin.coroutines.jvm.internal.*
1111

1212
@Suppress("UNCHECKED_CAST")
13+
private val emitFun =
14+
FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
15+
/*
16+
* Implementor of ContinuationImpl (that will be preserved as ABI nearly forever)
17+
* in order to properly control 'intercepted()' lifecycle.
18+
*/
19+
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER", "INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "UNCHECKED_CAST")
1320
internal actual class SafeCollector<T> actual constructor(
1421
@JvmField internal actual val collector: FlowCollector<T>,
1522
@JvmField internal actual val collectContext: CoroutineContext
16-
) : FlowCollector<T> {
23+
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext) {
1724

1825
@JvmField // Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector
1926
internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
2027
private var lastEmissionContext: CoroutineContext? = null
21-
private var intercepted: InterceptedContinuationOwner = InterceptedContinuationOwner()
22-
private val emitFun = run {
23-
collector::emit as Function2<T, Continuation<Unit>, Any?>
24-
}
28+
private var completion: Continuation<Unit>? = null
2529

26-
/*
27-
* Implementor of ContinuationImpl (that will be preserved by ABI implementation nearly forever)
28-
* in order to properly control 'intercepted()' lifecycle.
29-
*/
30-
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER", "INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
31-
private inner class InterceptedContinuationOwner : ContinuationImpl(NoOpContinuation, EmptyCoroutineContext) {
32-
@JvmField
33-
public var realCompletion: Continuation<Unit>? = null
34-
35-
override val context: CoroutineContext
36-
get() = realCompletion?.context ?: EmptyCoroutineContext
37-
38-
override fun invokeSuspend(result: Result<Any?>): Any? {
39-
result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
40-
realCompletion?.resumeWith(result as Result<Unit>)
41-
return COROUTINE_SUSPENDED
42-
}
30+
// ContinuationImpl
31+
override val context: CoroutineContext
32+
get() = completion?.context ?: EmptyCoroutineContext
4333

44-
// Escalate visibility to manually release intercepted continuation
45-
public override fun releaseIntercepted() {
46-
super.releaseIntercepted()
47-
}
34+
override fun invokeSuspend(result: Result<Any?>): Any? {
35+
result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
36+
completion?.resumeWith(result as Result<Unit>)
37+
return COROUTINE_SUSPENDED
38+
}
39+
40+
// Escalate visibility to manually release intercepted continuation
41+
public actual override fun releaseIntercepted() {
42+
super.releaseIntercepted()
4843
}
4944

5045
/**
@@ -72,8 +67,8 @@ internal actual class SafeCollector<T> actual constructor(
7267
if (previousContext !== currentContext) {
7368
checkContext(currentContext, previousContext, value)
7469
}
75-
intercepted.realCompletion = uCont
76-
return emitFun(value, intercepted as Continuation<Unit>)
70+
completion = uCont
71+
return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
7772
}
7873

7974
private fun checkContext(
@@ -88,10 +83,6 @@ internal actual class SafeCollector<T> actual constructor(
8883
lastEmissionContext = currentContext
8984
}
9085

91-
public actual fun release() {
92-
intercepted.releaseIntercepted()
93-
}
94-
9586
private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
9687
/*
9788
* Exception transparency ensures that if a `collect` block or any intermediate operator

kotlinx-coroutines-core/native/src/flow/internal/SafeCollector.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ internal actual class SafeCollector<T> actual constructor(
2525
collector.emit(value)
2626
}
2727

28-
public actual fun release() {
28+
public actual fun releaseIntercepted() {
2929
}
3030
}

0 commit comments

Comments
 (0)