Skip to content

Commit 305154b

Browse files
committed
Added rx2//rx3 stress tests for ObservableSource.asFlow()
1 parent 1ec50fe commit 305154b

File tree

2 files changed

+178
-0
lines changed

2 files changed

+178
-0
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.rx2
6+
7+
import io.reactivex.*
8+
import io.reactivex.exceptions.*
9+
import kotlinx.coroutines.*
10+
import kotlinx.coroutines.flow.*
11+
import org.junit.*
12+
import java.util.concurrent.*
13+
14+
class ObservableSourceAsFlowStressTest : TestBase() {
15+
16+
private val maxIterations = 25
17+
private val collectorJobCancelDelay = 1_000L
18+
private val nextIterationDelay = collectorJobCancelDelay * 2
19+
20+
private var jobCancelledException: Throwable? = null
21+
private val exceptionHandler = { throwable: Throwable ->
22+
jobCancelledException = extractJobCancelledException(throwable)
23+
}
24+
25+
@Before
26+
fun setup() {
27+
ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
28+
}
29+
30+
@Test
31+
fun testObservableSourceAsFlow_doesntThrowJobCancelledException() = withExceptionHandler(exceptionHandler) {
32+
val collectorThread = newSingleThreadContext("Collector Thread")
33+
val cancellerThread = newSingleThreadContext("Canceller Thread")
34+
val scope = CoroutineScope(Job())
35+
var iteration = 0
36+
37+
while (jobCancelledException == null && iteration < maxIterations) {
38+
scope.runIteration(collectorThread, cancellerThread)
39+
iteration += 1
40+
41+
Thread.sleep(nextIterationDelay)
42+
43+
collectorThread.cancel()
44+
cancellerThread.cancel()
45+
}
46+
47+
collectorThread.close()
48+
cancellerThread.close()
49+
scope.cancel()
50+
51+
jobCancelledException?.also {
52+
throw error("ObservableSource.asFlow() cancellation caused exception in iteration # $iteration", it)
53+
}
54+
}
55+
56+
private fun CoroutineScope.runIteration(
57+
collectorThread: ExecutorCoroutineDispatcher,
58+
cancellerThread: ExecutorCoroutineDispatcher
59+
) {
60+
val outerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS)
61+
val innerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS)
62+
63+
val collectorJob = launch(collectorThread) {
64+
outerObservable
65+
.asFlow()
66+
.flatMapLatest {
67+
innerObservable.asFlow()
68+
}
69+
.collect { delay(100) }
70+
}
71+
72+
launch(cancellerThread) {
73+
delay(collectorJobCancelDelay)
74+
collectorJob.cancel()
75+
}
76+
}
77+
78+
private fun extractJobCancelledException(throwable: Throwable): Throwable? {
79+
if (throwable is UndeliverableException) {
80+
if (throwable.cause !is InterruptedException) return throwable.cause
81+
}
82+
83+
if (throwable is InterruptedException) {
84+
if (throwable.cause !is InterruptedException) return throwable.cause
85+
}
86+
87+
return throwable
88+
}
89+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.rx3
6+
7+
import io.reactivex.rxjava3.core.*
8+
import io.reactivex.rxjava3.exceptions.*
9+
import kotlinx.coroutines.*
10+
import kotlinx.coroutines.flow.*
11+
import org.junit.*
12+
import java.util.concurrent.*
13+
14+
class ObservableSourceAsFlowStressTest : TestBase() {
15+
16+
private val maxIterations = 25
17+
private val collectorJobCancelDelay = 1_000L
18+
private val nextIterationDelay = collectorJobCancelDelay * 2
19+
20+
private var jobCancelledException: Throwable? = null
21+
private val exceptionHandler = { throwable: Throwable ->
22+
jobCancelledException = extractJobCancelledException(throwable)
23+
}
24+
25+
@Before
26+
fun setup() {
27+
ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
28+
}
29+
30+
@Test
31+
fun testObservableSourceAsFlow_doesntThrowJobCancelledException() = withExceptionHandler(exceptionHandler) {
32+
val collectorThread = newSingleThreadContext("Collector Thread")
33+
val cancellerThread = newSingleThreadContext("Canceller Thread")
34+
val scope = CoroutineScope(Job())
35+
var iteration = 0
36+
37+
while (jobCancelledException == null && iteration < maxIterations) {
38+
scope.runIteration(collectorThread, cancellerThread)
39+
iteration += 1
40+
41+
Thread.sleep(nextIterationDelay)
42+
43+
collectorThread.cancel()
44+
cancellerThread.cancel()
45+
}
46+
47+
collectorThread.close()
48+
cancellerThread.close()
49+
scope.cancel()
50+
51+
jobCancelledException?.also {
52+
throw error("ObservableSource.asFlow() cancellation caused exception in iteration # $iteration", it)
53+
}
54+
}
55+
56+
private fun CoroutineScope.runIteration(
57+
collectorThread: ExecutorCoroutineDispatcher,
58+
cancellerThread: ExecutorCoroutineDispatcher
59+
) {
60+
val outerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS)
61+
val innerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS)
62+
63+
val collectorJob = launch(collectorThread) {
64+
outerObservable
65+
.asFlow()
66+
.flatMapLatest {
67+
innerObservable.asFlow()
68+
}
69+
.collect { delay(100) }
70+
}
71+
72+
launch(cancellerThread) {
73+
delay(collectorJobCancelDelay)
74+
collectorJob.cancel()
75+
}
76+
}
77+
78+
private fun extractJobCancelledException(throwable: Throwable): Throwable? {
79+
if (throwable is UndeliverableException) {
80+
if (throwable.cause !is InterruptedException) return throwable.cause
81+
}
82+
83+
if (throwable is InterruptedException) {
84+
if (throwable.cause !is InterruptedException) return throwable.cause
85+
}
86+
87+
return throwable
88+
}
89+
}

0 commit comments

Comments
 (0)