Skip to content

Commit 345458b

Browse files
elizarovqwwdfsad
authored andcommitted
ReceiveChannel.receiveAsFlow extension (#1731)
Experimental ReceiveChannel.receiveAsFlow extension convert channel to flow in fan-out fashion allowing for multi-use. * Also, ReceiveChannel.consumeAsFlow is promoted to experimental from preview Fixes #1490
1 parent fe15b6d commit 345458b

File tree

3 files changed

+93
-14
lines changed

3 files changed

+93
-14
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+1
Original file line numberDiff line numberDiff line change
@@ -943,6 +943,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
943943
public static final fun onStart (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
944944
public static final fun produceIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
945945
public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
946+
public static final fun receiveAsFlow (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/flow/Flow;
946947
public static final fun reduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
947948
public static final synthetic fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
948949
public static final fun retry (Lkotlinx/coroutines/flow/Flow;JLkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/flow/Channels.kt

+44-14
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
2323
* This function provides a more efficient shorthand for `channel.consumeEach { value -> emit(value) }`.
2424
* See [consumeEach][ReceiveChannel.consumeEach].
2525
*/
26-
@ExperimentalCoroutinesApi
27-
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>) {
26+
@ExperimentalCoroutinesApi // since version 1.3.0
27+
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>) =
28+
emitAllImpl(channel, consume = true)
29+
30+
private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
2831
// Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveOrClosed".
2932
// It has smaller and more efficient spilled state which also allows to implement a manual kludge to
3033
// fix retention of the last emitted value.
@@ -59,51 +62,78 @@ public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>) {
5962
cause = e
6063
throw e
6164
} finally {
62-
channel.cancelConsumed(cause)
65+
if (consume) channel.cancelConsumed(cause)
6366
}
6467
}
6568

69+
/**
70+
* Represents the given receive channel as a hot flow and [receives][ReceiveChannel.receive] from the channel
71+
* in fan-out fashion every time this flow is collected. One element will be emitted to one collector only.
72+
*
73+
* See also [consumeAsFlow] which ensures that the resulting flow is collected just once.
74+
*
75+
* ### Cancellation semantics
76+
*
77+
* * Flow collectors are cancelled when the original channel is [closed][SendChannel.close] with an exception.
78+
* * Flow collectors complete normally when the original channel is [closed][SendChannel.close] normally.
79+
* * Failure or cancellation of the flow collector does not affect the channel.
80+
*
81+
* ### Operator fusion
82+
*
83+
* Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `receiveAsFlow` are fused.
84+
* In particular, [produceIn] returns the original channel.
85+
* Calls to [flowOn] have generally no effect, unless [buffer] is used to explicitly request buffering.
86+
*/
87+
@ExperimentalCoroutinesApi // since version 1.4.0
88+
public fun <T> ReceiveChannel<T>.receiveAsFlow(): Flow<T> = ChannelAsFlow(this, consume = false)
89+
6690
/**
6791
* Represents the given receive channel as a hot flow and [consumes][ReceiveChannel.consume] the channel
6892
* on the first collection from this flow. The resulting flow can be collected just once and throws
6993
* [IllegalStateException] when trying to collect it more than once.
7094
*
95+
* See also [receiveAsFlow] which supports multiple collectors of the resulting flow.
96+
*
7197
* ### Cancellation semantics
7298
*
73-
* 1) Flow consumer is cancelled when the original channel is cancelled.
74-
* 2) Flow consumer completes normally when the original channel was closed normally and then fully consumed.
75-
* 3) If the flow consumer fails with an exception, channel is cancelled.
99+
* * Flow collector is cancelled when the original channel is [closed][SendChannel.close] with an exception.
100+
* * Flow collector completes normally when the original channel is [closed][SendChannel.close] normally.
101+
* * If the flow collector fails with an exception, the source channel is [cancelled][ReceiveChannel.cancel].
76102
*
77103
* ### Operator fusion
78104
*
79105
* Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `consumeAsFlow` are fused.
80106
* In particular, [produceIn] returns the original channel (but throws [IllegalStateException] on repeated calls).
81107
* Calls to [flowOn] have generally no effect, unless [buffer] is used to explicitly request buffering.
82108
*/
83-
@FlowPreview
84-
public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T> = ConsumeAsFlow(this)
109+
@ExperimentalCoroutinesApi // since version 1.3.0
110+
public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T> = ChannelAsFlow(this, consume = true)
85111

86112
/**
87113
* Represents an existing [channel] as [ChannelFlow] implementation.
88114
* It fuses with subsequent [flowOn] operators, but for the most part ignores the specified context.
89115
* However, additional [buffer] calls cause a separate buffering channel to be created and that is where
90116
* the context might play a role, because it is used by the producing coroutine.
91117
*/
92-
private class ConsumeAsFlow<T>(
118+
private class ChannelAsFlow<T>(
93119
private val channel: ReceiveChannel<T>,
120+
private val consume: Boolean,
94121
context: CoroutineContext = EmptyCoroutineContext,
95122
capacity: Int = Channel.OPTIONAL_CHANNEL
96123
) : ChannelFlow<T>(context, capacity) {
97124
private val consumed = atomic(false)
98125

99-
private fun markConsumed() =
100-
check(!consumed.getAndSet(true)) { "ReceiveChannel.consumeAsFlow can be collected just once" }
126+
private fun markConsumed() {
127+
if (consume) {
128+
check(!consumed.getAndSet(true)) { "ReceiveChannel.consumeAsFlow can be collected just once" }
129+
}
130+
}
101131

102132
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
103-
ConsumeAsFlow(channel, context, capacity)
133+
ChannelAsFlow(channel, consume, context, capacity)
104134

105135
override suspend fun collectTo(scope: ProducerScope<T>) =
106-
SendingCollector(scope).emitAll(channel) // use efficient channel receiving code from emitAll
136+
SendingCollector(scope).emitAllImpl(channel, consume) // use efficient channel receiving code from emitAll
107137

108138
override fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> {
109139
markConsumed() // fail fast on repeated attempt to collect it
@@ -121,7 +151,7 @@ private class ConsumeAsFlow<T>(
121151
override suspend fun collect(collector: FlowCollector<T>) {
122152
if (capacity == Channel.OPTIONAL_CHANNEL) {
123153
markConsumed()
124-
collector.emitAll(channel) // direct
154+
collector.emitAllImpl(channel, consume) // direct
125155
} else {
126156
super.collect(collector) // extra buffering channel, produceImpl will mark it as consumed
127157
}

kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt

+48
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,18 @@ class ChannelBuildersFlowTest : TestBase() {
2121
assertFailsWith<IllegalStateException> { flow.collect() }
2222
}
2323

24+
@Test
25+
fun testChannelReceiveAsFlow() = runTest {
26+
val channel = produce {
27+
repeat(10) {
28+
send(it + 1)
29+
}
30+
}
31+
val flow = channel.receiveAsFlow()
32+
assertEquals(55, flow.sum())
33+
assertEquals(emptyList(), flow.toList())
34+
}
35+
2436
@Test
2537
fun testConsumeAsFlowCancellation() = runTest {
2638
val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well
@@ -36,6 +48,20 @@ class ChannelBuildersFlowTest : TestBase() {
3648
assertFailsWith<IllegalStateException> { flow.collect() }
3749
}
3850

51+
@Test
52+
fun testReceiveAsFlowCancellation() = runTest {
53+
val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well
54+
repeat(10) {
55+
send(it + 1)
56+
}
57+
throw TestException()
58+
}
59+
val flow = channel.receiveAsFlow()
60+
assertEquals(15, flow.take(5).sum()) // sum of first 5
61+
assertEquals(40, flow.take(5).sum()) // sum the rest 5
62+
assertFailsWith<TestException> { flow.sum() } // exception in the rest
63+
}
64+
3965
@Test
4066
fun testConsumeAsFlowException() = runTest {
4167
val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well
@@ -49,6 +75,19 @@ class ChannelBuildersFlowTest : TestBase() {
4975
assertFailsWith<IllegalStateException> { flow.collect() }
5076
}
5177

78+
@Test
79+
fun testReceiveAsFlowException() = runTest {
80+
val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well
81+
repeat(10) {
82+
send(it + 1)
83+
}
84+
throw TestException()
85+
}
86+
val flow = channel.receiveAsFlow()
87+
assertFailsWith<TestException> { flow.sum() }
88+
assertFailsWith<TestException> { flow.collect() } // repeated collection -- same exception
89+
}
90+
5291
@Test
5392
fun testConsumeAsFlowProduceFusing() = runTest {
5493
val channel = produce { send("OK") }
@@ -58,6 +97,15 @@ class ChannelBuildersFlowTest : TestBase() {
5897
channel.cancel()
5998
}
6099

100+
@Test
101+
fun testReceiveAsFlowProduceFusing() = runTest {
102+
val channel = produce { send("OK") }
103+
val flow = channel.receiveAsFlow()
104+
assertSame(channel, flow.produceIn(this))
105+
assertSame(channel, flow.produceIn(this)) // can use produce multiple times
106+
channel.cancel()
107+
}
108+
61109
@Test
62110
fun testConsumeAsFlowProduceBuffered() = runTest {
63111
expect(1)

0 commit comments

Comments
 (0)