Skip to content

Commit d831a86

Browse files
mareklangiewiczqwwdfsad
authored andcommitted
Add ObservableSource.asFlow operator (#1768)
1 parent 12e96cd commit d831a86

File tree

4 files changed

+223
-4
lines changed

4 files changed

+223
-4
lines changed

reactive/kotlinx-coroutines-rx2/README.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@ Coroutine builders:
1414

1515
Integration with [Flow]:
1616

17-
| **Name** | **Result** | **Description**
18-
| --------------- | -------------- | ---------------
19-
| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable.
20-
| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable.
17+
| **Name** | **Result** | **Description**
18+
| --------------- | -------------- | ---------------
19+
| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable.
20+
| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable.
21+
| [ObservableSource.asFlow] | `Flow` | Converts the given cold ObservableSource to flow
2122

2223
Suspending extension functions and suspending iteration:
2324

@@ -67,6 +68,7 @@ Conversion functions:
6768
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html
6869
[Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-flowable.html
6970
[Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-observable.html
71+
[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/as-flow.html
7072
[io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-completable-source/await.html
7173
[io.reactivex.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await.html
7274
[io.reactivex.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await-or-default.html

reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public final class kotlinx/coroutines/rx2/RxCompletableKt {
2929

3030
public final class kotlinx/coroutines/rx2/RxConvertKt {
3131
public static final fun asCompletable (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Completable;
32+
public static final fun asFlow (Lio/reactivex/ObservableSource;)Lkotlinx/coroutines/flow/Flow;
3233
public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Maybe;
3334
public static final fun asObservable (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
3435
public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Single;

reactive/kotlinx-coroutines-rx2/src/RxConvert.kt

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
package kotlinx.coroutines.rx2
66

77
import io.reactivex.*
8+
import io.reactivex.disposables.*
89
import kotlinx.coroutines.*
910
import kotlinx.coroutines.channels.*
1011
import kotlinx.coroutines.flow.*
1112
import kotlinx.coroutines.reactive.*
13+
import java.util.concurrent.atomic.*
1214
import kotlin.coroutines.*
1315

1416
/**
@@ -77,6 +79,35 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext):
7779
send(t)
7880
}
7981

82+
/**
83+
* Transforms given cold [ObservableSource] into cold [Flow].
84+
*
85+
* The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
86+
* is applied to the resulting flow.
87+
*
88+
* A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
89+
* resulting flow to specify a user-defined value and to control what happens when data is produced faster
90+
* than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
91+
*/
92+
@ExperimentalCoroutinesApi
93+
public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
94+
val disposableRef = AtomicReference<Disposable>()
95+
val observer = object : Observer<T> {
96+
override fun onComplete() { close() }
97+
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
98+
override fun onNext(t: T) { sendBlocking(t) }
99+
override fun onError(e: Throwable) { close(e) }
100+
}
101+
102+
subscribe(observer)
103+
awaitClose { disposableRef.getAndSet(Disposed)?.dispose() }
104+
}
105+
106+
private object Disposed : Disposable {
107+
override fun isDisposed() = true
108+
override fun dispose() = Unit
109+
}
110+
80111
/**
81112
* Converts the given flow to a cold observable.
82113
* The original flow is cancelled when the observable subscriber is disposed.
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.rx2
6+
7+
import io.reactivex.Observable
8+
import io.reactivex.ObservableSource
9+
import io.reactivex.Observer
10+
import io.reactivex.disposables.Disposables
11+
import io.reactivex.subjects.PublishSubject
12+
import kotlinx.coroutines.*
13+
import kotlinx.coroutines.channels.*
14+
import kotlinx.coroutines.flow.*
15+
import kotlin.test.*
16+
17+
class ObservableAsFlowTest : TestBase() {
18+
@Test
19+
fun testCancellation() = runTest {
20+
var onNext = 0
21+
var onCancelled = 0
22+
var onError = 0
23+
24+
val source = rxObservable(currentDispatcher()) {
25+
coroutineContext[Job]?.invokeOnCompletion {
26+
if (it is CancellationException) ++onCancelled
27+
}
28+
29+
repeat(100) {
30+
send(it)
31+
}
32+
}
33+
34+
source.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) {
35+
onEach {
36+
++onNext
37+
throw RuntimeException()
38+
}
39+
catch<Throwable> {
40+
++onError
41+
}
42+
}.join()
43+
44+
45+
assertEquals(1, onNext)
46+
assertEquals(1, onError)
47+
assertEquals(1, onCancelled)
48+
}
49+
50+
@Test
51+
fun testImmediateCollection() {
52+
val source = PublishSubject.create<Int>()
53+
val flow = source.asFlow()
54+
GlobalScope.launch(Dispatchers.Unconfined) {
55+
expect(1)
56+
flow.collect { expect(it) }
57+
expect(6)
58+
}
59+
expect(2)
60+
source.onNext(3)
61+
expect(4)
62+
source.onNext(5)
63+
source.onComplete()
64+
finish(7)
65+
}
66+
67+
@Test
68+
fun testOnErrorCancellation() {
69+
val source = PublishSubject.create<Int>()
70+
val flow = source.asFlow()
71+
val exception = RuntimeException()
72+
GlobalScope.launch(Dispatchers.Unconfined) {
73+
try {
74+
expect(1)
75+
flow.collect { expect(it) }
76+
expectUnreached()
77+
}
78+
catch (e: Exception) {
79+
assertSame(exception, e.cause)
80+
expect(5)
81+
}
82+
expect(6)
83+
}
84+
expect(2)
85+
source.onNext(3)
86+
expect(4)
87+
source.onError(exception)
88+
finish(7)
89+
}
90+
91+
@Test
92+
fun testUnsubscribeOnCollectionException() {
93+
val source = PublishSubject.create<Int>()
94+
val flow = source.asFlow()
95+
val exception = RuntimeException()
96+
GlobalScope.launch(Dispatchers.Unconfined) {
97+
try {
98+
expect(1)
99+
flow.collect {
100+
expect(it)
101+
if (it == 3) throw exception
102+
}
103+
expectUnreached()
104+
}
105+
catch (e: Exception) {
106+
assertSame(exception, e.cause)
107+
expect(4)
108+
}
109+
expect(5)
110+
}
111+
expect(2)
112+
assertTrue(source.hasObservers())
113+
source.onNext(3)
114+
assertFalse(source.hasObservers())
115+
finish(6)
116+
}
117+
118+
@Test
119+
fun testLateOnSubscribe() {
120+
var observer: Observer<in Int>? = null
121+
val source = ObservableSource<Int> { observer = it }
122+
val flow = source.asFlow()
123+
assertNull(observer)
124+
val job = GlobalScope.launch(Dispatchers.Unconfined) {
125+
expect(1)
126+
flow.collect { expectUnreached() }
127+
expectUnreached()
128+
}
129+
expect(2)
130+
assertNotNull(observer)
131+
job.cancel()
132+
val disposable = Disposables.empty()
133+
observer!!.onSubscribe(disposable)
134+
assertTrue(disposable.isDisposed)
135+
finish(3)
136+
}
137+
138+
@Test
139+
fun testBufferUnlimited() = runTest {
140+
val source = rxObservable(currentDispatcher()) {
141+
expect(1); send(10)
142+
expect(2); send(11)
143+
expect(3); send(12)
144+
expect(4); send(13)
145+
expect(5); send(14)
146+
expect(6); send(15)
147+
expect(7); send(16)
148+
expect(8); send(17)
149+
expect(9)
150+
}
151+
source.asFlow().buffer(Channel.UNLIMITED).collect { expect(it) }
152+
finish(18)
153+
}
154+
155+
@Test
156+
fun testConflated() = runTest {
157+
val source = Observable.range(1, 5)
158+
val list = source.asFlow().conflate().toList()
159+
assertEquals(listOf(1, 5), list)
160+
}
161+
162+
@Test
163+
fun testLongRange() = runTest {
164+
val source = Observable.range(1, 10_000)
165+
val count = source.asFlow().count()
166+
assertEquals(10_000, count)
167+
}
168+
169+
@Test
170+
fun testProduce() = runTest {
171+
val source = Observable.range(0, 10)
172+
val flow = source.asFlow()
173+
check((0..9).toList(), flow.produceIn(this))
174+
check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this))
175+
check((0..9).toList(), flow.buffer(2).produceIn(this))
176+
check((0..9).toList(), flow.buffer(0).produceIn(this))
177+
check(listOf(0, 9), flow.conflate().produceIn(this))
178+
}
179+
180+
private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) {
181+
val result = ArrayList<Int>(10)
182+
channel.consumeEach { result.add(it) }
183+
assertEquals(expected, result)
184+
}
185+
}

0 commit comments

Comments
 (0)