Skip to content

Commit 89fd2c1

Browse files
committed
Add some tests and fixes
* Fixed `doLockedNext` not releasing the lock in `PublisherCoroutine` if `null` is emitted * Fixed `flux`, `publish`, `rxObservable` and `rxFlowable` incorrectly reporting `isClosedForSend == true` just after closing the channel
1 parent e06faa9 commit 89fd2c1

File tree

11 files changed

+298
-40
lines changed

11 files changed

+298
-40
lines changed

reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines.jdk9
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.exceptions.*
89
import org.junit.Test
910
import kotlinx.coroutines.flow.flowOn
1011
import org.junit.runner.*
@@ -132,21 +133,13 @@ class IntegrationTest(
132133
}
133134

134135
internal suspend inline fun <reified E: Throwable> assertCallsExceptionHandlerWith(
135-
crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E
136-
{
137-
val caughtExceptions = mutableListOf<Throwable>()
138-
val exceptionHandler = object: AbstractCoroutineContextElement(CoroutineExceptionHandler),
139-
CoroutineExceptionHandler
140-
{
141-
override fun handleException(context: CoroutineContext, exception: Throwable) {
142-
caughtExceptions += exception
143-
}
144-
}
145-
return withContext(exceptionHandler) {
146-
operation(exceptionHandler)
147-
caughtExceptions.single().let {
148-
assertTrue(it is E, it.toString())
149-
it
136+
crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E =
137+
CapturingHandler().let { handler ->
138+
withContext(handler) {
139+
operation(handler)
140+
handler.getException().let {
141+
assertTrue(it is E, it.toString())
142+
it
143+
}
150144
}
151-
}
152-
}
145+
}

reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55
package kotlinx.coroutines.jdk9
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
89
import org.junit.Test
10+
import java.lang.NullPointerException
11+
import java.util.concurrent.*
12+
import java.util.concurrent.CancellationException
913
import java.util.concurrent.Flow as JFlow
1014
import kotlin.test.*
1115

@@ -121,6 +125,25 @@ class PublishTest : TestBase() {
121125
finish(7)
122126
}
123127

128+
/** Tests that, as soon as `ProducerScope.close` is called, `isClosedForSend` starts returning `true`. */
129+
@Test
130+
fun testChannelClosing() = runTest {
131+
expect(1)
132+
val publisher = flowPublish<Int>(Dispatchers.Unconfined) {
133+
expect(3)
134+
close()
135+
assert(isClosedForSend)
136+
expect(4)
137+
}
138+
try {
139+
expect(2)
140+
publisher.awaitFirstOrNull()
141+
} catch (e: CancellationException) {
142+
expect(5)
143+
}
144+
finish(6)
145+
}
146+
124147
@Test
125148
fun testOnNextError() = runTest {
126149
val latch = CompletableDeferred<Unit>()
@@ -130,9 +153,10 @@ class PublishTest : TestBase() {
130153
expect(4)
131154
try {
132155
send("OK")
133-
} catch(e: Throwable) {
156+
} catch (e: Throwable) {
134157
expect(6)
135158
assert(e is TestException)
159+
assert(isClosedForSend)
136160
latch.complete(Unit)
137161
}
138162
}
@@ -162,6 +186,51 @@ class PublishTest : TestBase() {
162186
finish(7)
163187
}
164188

189+
/** Tests the behavior when a call to `onNext` fails after the channel is already closed. */
190+
@Test
191+
fun testOnNextErrorAfterCancellation() = runTest {
192+
assertCallsExceptionHandlerWith<TestException> { handler ->
193+
var producerScope: ProducerScope<Int>? = null
194+
CompletableDeferred<Unit>()
195+
expect(1)
196+
var job: Job? = null
197+
val publisher = flowPublish<Int>(handler + Dispatchers.Unconfined) {
198+
producerScope = this
199+
expect(4)
200+
job = launch {
201+
delay(Long.MAX_VALUE)
202+
}
203+
}
204+
expect(2)
205+
publisher.subscribe(object: JFlow.Subscriber<Int> {
206+
override fun onSubscribe(s: JFlow.Subscription) {
207+
expect(3)
208+
s.request(Long.MAX_VALUE)
209+
}
210+
override fun onNext(t: Int) {
211+
expect(6)
212+
assertEquals(1, t)
213+
job!!.cancel()
214+
throw TestException()
215+
}
216+
override fun onError(t: Throwable?) {
217+
/* Correct changes to the implementation could lead to us entering or not entering this method, but
218+
it only matters that if we do, it is the "correct" exception that was validly used to cancel the
219+
coroutine that gets passed here and not `TestException`. */
220+
assertTrue(t is CancellationException)
221+
}
222+
override fun onComplete() { expectUnreached() }
223+
})
224+
expect(5)
225+
val result: ChannelResult<Unit> = producerScope!!.trySend(1)
226+
val e = result.exceptionOrNull()!!
227+
assertTrue(e is CancellationException, "The actual error: $e")
228+
assertTrue(producerScope!!.isClosedForSend)
229+
assertTrue(result.isFailure)
230+
}
231+
finish(7)
232+
}
233+
165234
@Test
166235
fun testFailingConsumer() = runTest {
167236
val pub = flowPublish(currentDispatcher()) {
@@ -183,4 +252,39 @@ class PublishTest : TestBase() {
183252
fun testIllegalArgumentException() {
184253
assertFailsWith<IllegalArgumentException> { flowPublish<Int>(Job()) { } }
185254
}
255+
256+
/** Tests that `trySend` doesn't throw in `flowPublish`. */
257+
@Test
258+
fun testTrySendNotThrowing() = runTest {
259+
var producerScope: ProducerScope<Int>? = null
260+
expect(1)
261+
val publisher = flowPublish<Int>(Dispatchers.Unconfined) {
262+
producerScope = this
263+
expect(3)
264+
delay(Long.MAX_VALUE)
265+
}
266+
val job = launch(start = CoroutineStart.UNDISPATCHED) {
267+
expect(2)
268+
publisher.awaitFirstOrNull()
269+
expectUnreached()
270+
}
271+
job.cancel()
272+
expect(4)
273+
val result = producerScope!!.trySend(1)
274+
assertTrue(result.isFailure)
275+
finish(5)
276+
}
277+
278+
/** Tests that all methods on `flowPublish` fail without closing the channel when attempting to emit `null`. */
279+
@Test
280+
fun testEmittingNull() = runTest {
281+
val publisher = flowPublish {
282+
assertFailsWith<NullPointerException> { send(null) }
283+
assertFailsWith<NullPointerException> { trySend(null) }
284+
@Suppress("DEPRECATION")
285+
assertFailsWith<NullPointerException> { offer(null) }
286+
send("OK")
287+
}
288+
assertEquals("OK", publisher.awaitFirstOrNull())
289+
}
186290
}

reactive/kotlinx-coroutines-reactive/src/Publish.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public class PublisherCoroutine<in T>(
7575
@Volatile
7676
private var cancelled = false // true after Subscription.cancel() is invoked
7777

78-
override val isClosedForSend: Boolean get() = isCompleted
78+
override val isClosedForSend: Boolean get() = !isActive
7979
override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
8080
override fun invokeOnClose(handler: (Throwable?) -> Unit): Nothing =
8181
throw UnsupportedOperationException("PublisherCoroutine doesn't support invokeOnClose")
@@ -133,6 +133,7 @@ public class PublisherCoroutine<in T>(
133133
*/
134134
private fun doLockedNext(elem: T): Throwable? {
135135
if (elem == null) {
136+
unlockAndCheckCompleted()
136137
throw NullPointerException("Attempted to emit `null` inside a reactive publisher")
137138
}
138139
/** This guards against the case when the caller of this function managed to lock the mutex not because some
@@ -212,7 +213,7 @@ public class PublisherCoroutine<in T>(
212213
_nRequested.value = SIGNALLED // we'll signal onError/onCompleted (the final state, so no CAS needed)
213214
// Specification requires that after the cancellation is requested we eventually stop calling onXXX
214215
if (cancelled) {
215-
// If the parent had failed to handle our exception, then we must not lose this exception
216+
// If the parent failed to handle this exception, then we must not lose the exception
216217
if (cause != null && !handled) exceptionOnCancelHandler(cause, context)
217218
return
218219
}

reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,11 @@ public class FlowSubscription<T>(
208208
consumeFlow()
209209
true
210210
} catch (cause: Throwable) {
211+
/* TODO: The part after "||" is a hack needed due to [cause] having travelled over a coroutine boundary and
212+
being changed from the result of [getCancellationException()]. */
211213
if (cancellationRequested && !isActive && (cause === getCancellationException() || cause.cause === getCancellationException() && cause.message == getCancellationException().message)) {
214+
/* TODO: This is incorrect, as [Subscriber.onComplete] denotes the end of the stream and not just any
215+
non-erroneous terminal state. */
212216
subscriber.onComplete()
213217
} else {
214218
try {

reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55
package kotlinx.coroutines.reactive
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.exceptions.*
89
import org.junit.Test
910
import org.junit.runner.*
1011
import org.junit.runners.*
1112
import org.reactivestreams.*
1213
import java.lang.IllegalStateException
1314
import java.lang.RuntimeException
15+
import kotlin.contracts.*
1416
import kotlin.coroutines.*
1517
import kotlin.test.*
1618

@@ -235,20 +237,16 @@ class IntegrationTest(
235237

236238
}
237239

240+
@OptIn(ExperimentalContracts::class)
238241
internal suspend inline fun <reified E: Throwable> assertCallsExceptionHandlerWith(
239-
crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E
240-
{
241-
val caughtExceptions = mutableListOf<Throwable>()
242-
val exceptionHandler = object: AbstractCoroutineContextElement(CoroutineExceptionHandler),
243-
CoroutineExceptionHandler
244-
{
245-
override fun handleException(context: CoroutineContext, exception: Throwable) {
246-
caughtExceptions += exception
247-
}
242+
crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E {
243+
contract {
244+
callsInPlace(operation, InvocationKind.EXACTLY_ONCE)
248245
}
249-
return withContext(exceptionHandler) {
250-
operation(exceptionHandler)
251-
caughtExceptions.single().let {
246+
val handler = CapturingHandler()
247+
return withContext(handler) {
248+
operation(handler)
249+
handler.getException().let {
252250
assertTrue(it is E, it.toString())
253251
it
254252
}

0 commit comments

Comments
 (0)