Skip to content

Commit 29eb721

Browse files
committed
Make SafeCollector platform-specific declaration and enforce exception transparency invariant on JVM
* Make it in allocation-free manner by using crafty trick with casting KSuspendFunction to Function and pass reusable object as a completion
1 parent 09cb4bf commit 29eb721

File tree

12 files changed

+437
-153
lines changed

12 files changed

+437
-153
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1001,7 +1001,7 @@ public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt {
10011001
public static final fun checkIndexOverflow (I)I
10021002
}
10031003

1004-
public final class kotlinx/coroutines/flow/internal/SafeCollectorKt {
1004+
public final class kotlinx/coroutines/flow/internal/SafeCollector_commonKt {
10051005
public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
10061006
}
10071007

kotlinx-coroutines-core/common/src/flow/Builders.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,12 @@ public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit
5151
// Named anonymous object
5252
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : Flow<T> {
5353
override suspend fun collect(collector: FlowCollector<T>) {
54-
SafeCollector(collector, coroutineContext).block()
54+
val safeCollector = SafeCollector(collector, coroutineContext)
55+
try {
56+
safeCollector.block()
57+
} finally {
58+
safeCollector.release()
59+
}
5560
}
5661
}
5762

kotlinx-coroutines-core/common/src/flow/Flow.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,8 @@ import kotlin.coroutines.*
149149
* it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught"
150150
* by an upstream flow, limiting the ability of local reasoning about the code.
151151
*
152-
* Currently, the flow infrastructure does not enforce exception transparency contracts, however, it might be enforced
153-
* in the future either at run time or at compile time.
152+
* Flow machinery enforces exception transparency at runtime an throws [IllegalStateException] on any attempt to emit value,
153+
* if an exception has been thrown on previous attemp.
154154
*
155155
* ### Reactive streams
156156
*
@@ -199,7 +199,7 @@ public abstract class AbstractFlow<T> : Flow<T> {
199199

200200
@InternalCoroutinesApi
201201
public final override suspend fun collect(collector: FlowCollector<T>) {
202-
collectSafely(SafeCollector(collector, collectContext = coroutineContext))
202+
collectSafely(SafeCollector(collector, coroutineContext))
203203
}
204204

205205
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow.internal
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
import kotlinx.coroutines.internal.ScopeCoroutine
10+
import kotlin.coroutines.*
11+
import kotlin.jvm.*
12+
13+
internal expect class SafeCollector<T>(
14+
collector: FlowCollector<T>,
15+
collectContext: CoroutineContext
16+
) : FlowCollector<T> {
17+
internal val collector: FlowCollector<T>
18+
internal val collectContext: CoroutineContext
19+
internal val collectContextSize: Int
20+
public fun release()
21+
}
22+
23+
@JvmName("checkContext") // For prettier stack traces
24+
internal fun SafeCollector<*>.checkContext(currentContext: CoroutineContext) {
25+
val result = currentContext.fold(0) fold@{ count, element ->
26+
val key = element.key
27+
val collectElement = collectContext[key]
28+
if (key !== Job) {
29+
return@fold if (element !== collectElement) Int.MIN_VALUE
30+
else count + 1
31+
}
32+
33+
val collectJob = collectElement as Job?
34+
val emissionParentJob = (element as Job).transitiveCoroutineParent(collectJob)
35+
/*
36+
* Code like
37+
* ```
38+
* coroutineScope {
39+
* launch {
40+
* emit(1)
41+
* }
42+
*
43+
* launch {
44+
* emit(2)
45+
* }
46+
* }
47+
* ```
48+
* is prohibited because 'emit' is not thread-safe by default. Use 'channelFlow' instead if you need concurrent emission
49+
* or want to switch context dynamically (e.g. with `withContext`).
50+
*
51+
* Note that collecting from another coroutine is allowed, e.g.:
52+
* ```
53+
* coroutineScope {
54+
* val channel = produce {
55+
* collect { value ->
56+
* send(value)
57+
* }
58+
* }
59+
* channel.consumeEach { value ->
60+
* emit(value)
61+
* }
62+
* }
63+
* ```
64+
* is a completely valid.
65+
*/
66+
if (emissionParentJob !== collectJob) {
67+
error(
68+
"Flow invariant is violated:\n" +
69+
"\t\tEmission from another coroutine is detected.\n" +
70+
"\t\tChild of $emissionParentJob, expected child of $collectJob.\n" +
71+
"\t\tFlowCollector is not thread-safe and concurrent emissions are prohibited.\n" +
72+
"\t\tTo mitigate this restriction please use 'channelFlow' builder instead of 'flow'"
73+
)
74+
}
75+
76+
/*
77+
* If collect job is null (-> EmptyCoroutineContext, probably run from `suspend fun main`), then invariant is maintained
78+
* (common transitive parent is "null"), but count check will fail, so just do not count job context element when
79+
* flow is collected from EmptyCoroutineContext
80+
*/
81+
if (collectJob == null) count else count + 1
82+
}
83+
if (result != collectContextSize) {
84+
error(
85+
"Flow invariant is violated:\n" +
86+
"\t\tFlow was collected in $collectContext,\n" +
87+
"\t\tbut emission happened in $currentContext.\n" +
88+
"\t\tPlease refer to 'flow' documentation or use 'flowOn' instead"
89+
)
90+
}
91+
}
92+
93+
internal tailrec fun Job?.transitiveCoroutineParent(collectJob: Job?): Job? {
94+
if (this === null) return null
95+
if (this === collectJob) return this
96+
if (this !is ScopeCoroutine<*>) return this
97+
return parent.transitiveCoroutineParent(collectJob)
98+
}
99+
100+
/**
101+
* An analogue of the [flow] builder that does not check the context of execution of the resulting flow.
102+
* Used in our own operators where we trust the context of invocations.
103+
*/
104+
@PublishedApi
105+
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
106+
return object : Flow<T> {
107+
override suspend fun collect(collector: FlowCollector<T>) {
108+
collector.block()
109+
}
110+
}
111+
}

kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt

-124
This file was deleted.

kotlinx-coroutines-core/common/test/TestBase.common.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public suspend inline fun <reified T : Throwable> assertFailsWith(flow: Flow<*>)
5050
flow.collect()
5151
fail("Should be unreached")
5252
} catch (e: Throwable) {
53-
assertTrue(e is T)
53+
assertTrue(e is T, "Expected exception ${T::class}, but had $e instead")
5454
}
5555
}
5656

kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt

+4-23
Original file line numberDiff line numberDiff line change
@@ -59,25 +59,6 @@ class FlowInvariantsTest : TestBase() {
5959
}
6060
}
6161

62-
@Test
63-
fun testCachedInvariantCheckResult() = runParametrizedTest<Int> { flow ->
64-
flow {
65-
emit(1)
66-
try {
67-
withContext(NamedDispatchers("foo")) {
68-
emit(1)
69-
}
70-
fail()
71-
} catch (e: IllegalStateException) {
72-
expect(2)
73-
}
74-
emit(3)
75-
}.collect {
76-
expect(it)
77-
}
78-
finish(4)
79-
}
80-
8162
@Test
8263
fun testWithNameContractViolated() = runParametrizedTest<Int>(IllegalStateException::class) { flow ->
8364
flow {
@@ -146,9 +127,9 @@ class FlowInvariantsTest : TestBase() {
146127
}
147128
}
148129

149-
val flow = flowOf(1)
150-
assertFailsWith<IllegalStateException> { flow.merge(flow).toList() }
151-
assertFailsWith<IllegalStateException> { flow.trickyMerge(flow).toList() }
130+
val flowInstance = flowOf(1)
131+
assertFailsWith<IllegalStateException> { flowInstance.merge(flowInstance).toList() }
132+
assertFailsWith<IllegalStateException> { flowInstance.trickyMerge(flowInstance).toList() }
152133
}
153134

154135
@Test
@@ -237,7 +218,7 @@ class FlowInvariantsTest : TestBase() {
237218
emptyContextTest {
238219
transform {
239220
expect(it)
240-
kotlinx.coroutines.withContext(Dispatchers.Unconfined) {
221+
withContext(Dispatchers.Unconfined) {
241222
emit(it + 1)
242223
}
243224
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlin.test.*
9+
10+
class SafeFlowTest : TestBase() {
11+
12+
@Test
13+
fun testEmissionsFromDifferentStateMachine() = runTest {
14+
val result = flow<Int> {
15+
emit1(1)
16+
emit2(2)
17+
}.onEach { yield() }.toList()
18+
assertEquals(listOf(1, 2), result)
19+
finish(3)
20+
}
21+
22+
private suspend fun FlowCollector<Int>.emit1(expect: Int) {
23+
emit(expect)
24+
expect(expect)
25+
}
26+
27+
private suspend fun FlowCollector<Int>.emit2(expect: Int) {
28+
emit(expect)
29+
expect(expect)
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow.internal
6+
7+
import kotlinx.coroutines.flow.*
8+
import kotlin.coroutines.*
9+
10+
internal actual class SafeCollector<T> actual constructor(
11+
internal actual val collector: FlowCollector<T>,
12+
internal actual val collectContext: CoroutineContext
13+
) : FlowCollector<T> {
14+
15+
// Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector
16+
internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
17+
private var lastEmissionContext: CoroutineContext? = null
18+
19+
override suspend fun emit(value: T) {
20+
val currentContext = coroutineContext
21+
if (lastEmissionContext !== currentContext) {
22+
checkContext(currentContext)
23+
lastEmissionContext = currentContext
24+
}
25+
collector.emit(value)
26+
}
27+
28+
public actual fun release() {
29+
}
30+
}

0 commit comments

Comments
 (0)