|
| 1 | +/* |
| 2 | + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| 3 | + */ |
| 4 | + |
| 5 | +package kotlinx.coroutines.rx3 |
| 6 | + |
| 7 | +import io.reactivex.rxjava3.core.* |
| 8 | +import io.reactivex.rxjava3.exceptions.* |
| 9 | +import kotlinx.coroutines.* |
| 10 | +import kotlinx.coroutines.flow.* |
| 11 | +import org.junit.* |
| 12 | +import java.util.concurrent.* |
| 13 | + |
| 14 | +class ObservableSourceAsFlowStressTest : TestBase() { |
| 15 | + |
| 16 | + private val maxIterations = 25 |
| 17 | + private val collectorJobCancelDelay = 1_000L |
| 18 | + private val nextIterationDelay = collectorJobCancelDelay * 2 |
| 19 | + |
| 20 | + private var jobCancelledException: Throwable? = null |
| 21 | + private val exceptionHandler = { throwable: Throwable -> |
| 22 | + jobCancelledException = extractJobCancelledException(throwable) |
| 23 | + } |
| 24 | + |
| 25 | + @Before |
| 26 | + fun setup() { |
| 27 | + ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") |
| 28 | + } |
| 29 | + |
| 30 | + @Test |
| 31 | + fun testObservableSourceAsFlow_doesntThrowJobCancelledException() = withExceptionHandler(exceptionHandler) { |
| 32 | + val collectorThread = newSingleThreadContext("Collector Thread") |
| 33 | + val cancellerThread = newSingleThreadContext("Canceller Thread") |
| 34 | + val scope = CoroutineScope(Job()) |
| 35 | + var iteration = 0 |
| 36 | + |
| 37 | + while (jobCancelledException == null && iteration < maxIterations) { |
| 38 | + scope.runIteration(collectorThread, cancellerThread) |
| 39 | + iteration += 1 |
| 40 | + |
| 41 | + Thread.sleep(nextIterationDelay) |
| 42 | + |
| 43 | + collectorThread.cancel() |
| 44 | + cancellerThread.cancel() |
| 45 | + } |
| 46 | + |
| 47 | + collectorThread.close() |
| 48 | + cancellerThread.close() |
| 49 | + scope.cancel() |
| 50 | + |
| 51 | + jobCancelledException?.also { |
| 52 | + throw error("ObservableSource.asFlow() cancellation caused exception in iteration # $iteration", it) |
| 53 | + } |
| 54 | + } |
| 55 | + |
| 56 | + private fun CoroutineScope.runIteration( |
| 57 | + collectorThread: ExecutorCoroutineDispatcher, |
| 58 | + cancellerThread: ExecutorCoroutineDispatcher |
| 59 | + ) { |
| 60 | + val outerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS) |
| 61 | + val innerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS) |
| 62 | + |
| 63 | + val collectorJob = launch(collectorThread) { |
| 64 | + outerObservable |
| 65 | + .asFlow() |
| 66 | + .flatMapLatest { |
| 67 | + innerObservable.asFlow() |
| 68 | + } |
| 69 | + .collect { delay(100) } |
| 70 | + } |
| 71 | + |
| 72 | + launch(cancellerThread) { |
| 73 | + delay(collectorJobCancelDelay) |
| 74 | + collectorJob.cancel() |
| 75 | + } |
| 76 | + } |
| 77 | + |
| 78 | + private fun extractJobCancelledException(throwable: Throwable): Throwable? { |
| 79 | + if (throwable is UndeliverableException) { |
| 80 | + if (throwable.cause !is InterruptedException) return throwable.cause |
| 81 | + } |
| 82 | + |
| 83 | + if (throwable is InterruptedException) { |
| 84 | + if (throwable.cause !is InterruptedException) return throwable.cause |
| 85 | + } |
| 86 | + |
| 87 | + return throwable |
| 88 | + } |
| 89 | +} |
0 commit comments