Skip to content

Commit 45399e3

Browse files
committed
Write short and fast reproducer
1 parent 670afde commit 45399e3

File tree

4 files changed

+26
-131
lines changed

4 files changed

+26
-131
lines changed

reactive/kotlinx-coroutines-rx2/src/RxConvert.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
8484
override fun onNext(t: T) {
8585
try {
8686
sendBlocking(t)
87-
} catch (ignored: Throwable) { //TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
87+
} catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
88+
// Is handled by the downstream flow
8889
}
8990
}
9091
override fun onError(e: Throwable) { close(e) }

reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt

Lines changed: 11 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -5,85 +5,31 @@
55
package kotlinx.coroutines.rx2
66

77
import io.reactivex.*
8-
import io.reactivex.exceptions.*
98
import kotlinx.coroutines.*
9+
import kotlinx.coroutines.channels.*
1010
import kotlinx.coroutines.flow.*
1111
import org.junit.*
1212
import java.util.concurrent.*
1313

1414
class ObservableSourceAsFlowStressTest : TestBase() {
1515

16-
private val maxIterations = 25
17-
private val collectorJobCancelDelay = 1_000L
18-
private val nextIterationDelay = collectorJobCancelDelay * 2
19-
20-
private var jobCancelledException: Throwable? = null
21-
private val exceptionHandler = { throwable: Throwable ->
22-
jobCancelledException = extractJobCancelledException(throwable)
23-
}
16+
private val iterations = 100 * stressTestMultiplierSqrt
2417

2518
@Before
2619
fun setup() {
2720
ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
2821
}
2922

3023
@Test
31-
fun testObservableSourceAsFlow_doesntThrowJobCancelledException() = withExceptionHandler(exceptionHandler) {
32-
val collectorThread = newSingleThreadContext("Collector Thread")
33-
val cancellerThread = newSingleThreadContext("Canceller Thread")
34-
val scope = CoroutineScope(Job())
35-
var iteration = 0
36-
37-
while (jobCancelledException == null && iteration < maxIterations) {
38-
scope.runIteration(collectorThread, cancellerThread)
39-
iteration += 1
40-
41-
Thread.sleep(nextIterationDelay)
42-
43-
collectorThread.cancel()
44-
cancellerThread.cancel()
45-
}
46-
47-
collectorThread.close()
48-
cancellerThread.close()
49-
scope.cancel()
50-
51-
jobCancelledException?.also {
52-
throw error("ObservableSource.asFlow() cancellation caused exception in iteration # $iteration", it)
53-
}
54-
}
55-
56-
private fun CoroutineScope.runIteration(
57-
collectorThread: ExecutorCoroutineDispatcher,
58-
cancellerThread: ExecutorCoroutineDispatcher
59-
) {
60-
val outerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS)
61-
val innerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS)
62-
63-
val collectorJob = launch(collectorThread) {
64-
outerObservable
65-
.asFlow()
66-
.flatMapLatest {
67-
innerObservable.asFlow()
68-
}
69-
.collect { delay(100) }
70-
}
71-
72-
launch(cancellerThread) {
73-
delay(collectorJobCancelDelay)
74-
collectorJob.cancel()
75-
}
76-
}
77-
78-
private fun extractJobCancelledException(throwable: Throwable): Throwable? {
79-
if (throwable is UndeliverableException) {
80-
if (throwable.cause !is InterruptedException) return throwable.cause
24+
fun testAsFlowCancellation() = runTest {
25+
repeat(iterations) {
26+
val latch = Channel<Unit>(1)
27+
var i = 0
28+
val observable = Observable.interval(100L, TimeUnit.MICROSECONDS)
29+
.doOnNext { if (++i > 100) latch.offer(Unit) }
30+
val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default))
31+
latch.receive()
32+
job.cancelAndJoin()
8133
}
82-
83-
if (throwable is InterruptedException) {
84-
if (throwable.cause !is InterruptedException) return throwable.cause
85-
}
86-
87-
return throwable
8834
}
8935
}

reactive/kotlinx-coroutines-rx3/src/RxConvert.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
8484
override fun onNext(t: T) {
8585
try {
8686
sendBlocking(t)
87-
} catch (ignored: Throwable) { //TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
87+
} catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
88+
// Is handled by the downstream flow
8889
}
8990
}
9091
override fun onError(e: Throwable) { close(e) }

reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt

Lines changed: 11 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -7,83 +7,30 @@ package kotlinx.coroutines.rx3
77
import io.reactivex.rxjava3.core.*
88
import io.reactivex.rxjava3.exceptions.*
99
import kotlinx.coroutines.*
10+
import kotlinx.coroutines.channels.*
1011
import kotlinx.coroutines.flow.*
1112
import org.junit.*
1213
import java.util.concurrent.*
1314

1415
class ObservableSourceAsFlowStressTest : TestBase() {
1516

16-
private val maxIterations = 25
17-
private val collectorJobCancelDelay = 1_000L
18-
private val nextIterationDelay = collectorJobCancelDelay * 2
19-
20-
private var jobCancelledException: Throwable? = null
21-
private val exceptionHandler = { throwable: Throwable ->
22-
jobCancelledException = extractJobCancelledException(throwable)
23-
}
17+
private val iterations = 100 * stressTestMultiplierSqrt
2418

2519
@Before
2620
fun setup() {
2721
ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
2822
}
2923

3024
@Test
31-
fun testObservableSourceAsFlow_doesntThrowJobCancelledException() = withExceptionHandler(exceptionHandler) {
32-
val collectorThread = newSingleThreadContext("Collector Thread")
33-
val cancellerThread = newSingleThreadContext("Canceller Thread")
34-
val scope = CoroutineScope(Job())
35-
var iteration = 0
36-
37-
while (jobCancelledException == null && iteration < maxIterations) {
38-
scope.runIteration(collectorThread, cancellerThread)
39-
iteration += 1
40-
41-
Thread.sleep(nextIterationDelay)
42-
43-
collectorThread.cancel()
44-
cancellerThread.cancel()
45-
}
46-
47-
collectorThread.close()
48-
cancellerThread.close()
49-
scope.cancel()
50-
51-
jobCancelledException?.also {
52-
throw error("ObservableSource.asFlow() cancellation caused exception in iteration # $iteration", it)
53-
}
54-
}
55-
56-
private fun CoroutineScope.runIteration(
57-
collectorThread: ExecutorCoroutineDispatcher,
58-
cancellerThread: ExecutorCoroutineDispatcher
59-
) {
60-
val outerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS)
61-
val innerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS)
62-
63-
val collectorJob = launch(collectorThread) {
64-
outerObservable
65-
.asFlow()
66-
.flatMapLatest {
67-
innerObservable.asFlow()
68-
}
69-
.collect { delay(100) }
70-
}
71-
72-
launch(cancellerThread) {
73-
delay(collectorJobCancelDelay)
74-
collectorJob.cancel()
75-
}
76-
}
77-
78-
private fun extractJobCancelledException(throwable: Throwable): Throwable? {
79-
if (throwable is UndeliverableException) {
80-
if (throwable.cause !is InterruptedException) return throwable.cause
25+
fun testAsFlowCancellation() = runTest {
26+
repeat(iterations) {
27+
val latch = Channel<Unit>(1)
28+
var i = 0
29+
val observable = Observable.interval(100L, TimeUnit.MICROSECONDS)
30+
.doOnNext { if (++i > 100) latch.offer(Unit) }
31+
val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default))
32+
latch.receive()
33+
job.cancelAndJoin()
8134
}
82-
83-
if (throwable is InterruptedException) {
84-
if (throwable.cause !is InterruptedException) return throwable.cause
85-
}
86-
87-
return throwable
8835
}
8936
}

0 commit comments

Comments
 (0)