Skip to content

Commit bf33052

Browse files
qwwdfsadelizarov
authored andcommitted
Introduce InlineList to simplify helpClose logic, reverse helpClose resume order
1 parent d02febf commit bf33052

File tree

3 files changed

+85
-14
lines changed

3 files changed

+85
-14
lines changed

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -304,8 +304,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
304304
* Now if T2's close resumes T1's receive_2 then it's receive gets "closed for receive" exception, but
305305
* its subsequent attempt to send successfully rendezvous with receive_1, producing non-linearizable execution.
306306
*/
307-
var closedNode: Receive<E>? = null // used when one node was closed to avoid extra memory allocation
308-
var closedList: ArrayList<Receive<E>>? = null // used when more nodes were closed
307+
var closedList = InlineList<Receive<E>>()
309308
while (true) {
310309
// Break when channel is empty or has no receivers
311310
@Suppress("UNCHECKED_CAST")
@@ -316,19 +315,14 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
316315
previous.helpRemove() // make sure remove is complete before continuing
317316
continue
318317
}
319-
// add removed nodes to a separate list
320-
if (closedNode == null) {
321-
closedNode = previous
322-
} else {
323-
val list = closedList ?: ArrayList<Receive<E>>().also { closedList = it }
324-
list += previous
325-
}
326-
}
327-
// now notify all removed nodes that the channel was closed
328-
if (closedNode != null) {
329-
closedNode.resumeReceiveClosed(closed)
330-
closedList?.forEach { it.resumeReceiveClosed(closed) }
318+
// add removed nodes to a separate list
319+
closedList += previous
331320
}
321+
/*
322+
* Now notify all removed nodes that the channel was closed
323+
* in the order they were added to the channel
324+
*/
325+
closedList.forEachReversed { it.resumeReceiveClosed(closed) }
332326
// and do other post-processing
333327
onClosedIdempotent(closed)
334328
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
@file:Suppress("UNCHECKED_CAST")
6+
7+
package kotlinx.coroutines.internal
8+
9+
import kotlinx.coroutines.assert
10+
11+
/*
12+
* Inline class that represents a mutable list, but does not allocate an underlying storage
13+
* for zero and one elements.
14+
* Cannot be parametrized with `List<*>`.
15+
*/
16+
internal inline class InlineList<E>(private val holder: Any? = null) {
17+
public operator fun plus(element: E): InlineList<E> {
18+
assert { element !is List<*> } // Lists are prohibited
19+
return when (holder) {
20+
null -> InlineList(element)
21+
is ArrayList<*> -> {
22+
(holder as ArrayList<E>).add(element)
23+
InlineList(holder)
24+
}
25+
else -> {
26+
val list = ArrayList<E>(4)
27+
list.add(holder as E)
28+
list.add(element)
29+
InlineList(list)
30+
}
31+
}
32+
}
33+
34+
public inline fun forEachReversed(action: (E) -> Unit) {
35+
when (holder) {
36+
null -> return
37+
!is ArrayList<*> -> action(holder as E)
38+
else -> {
39+
val list = holder as ArrayList<E>
40+
for (i in (list.size - 1) downTo 0) {
41+
action(list[i])
42+
}
43+
}
44+
}
45+
}
46+
}

kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,37 @@ class ChannelsTest: TestBase() {
1919
assertEquals(testList, testList.asReceiveChannel().toList())
2020
}
2121

22+
@Test
23+
fun testCloseWithMultipleWaiters() = runTest {
24+
val channel = Channel<Int>()
25+
launch {
26+
try {
27+
expect(2)
28+
channel.receive()
29+
expectUnreached()
30+
} catch (e: ClosedReceiveChannelException) {
31+
expect(5)
32+
}
33+
}
34+
35+
launch {
36+
try {
37+
expect(3)
38+
channel.receive()
39+
expectUnreached()
40+
} catch (e: ClosedReceiveChannelException) {
41+
expect(6)
42+
}
43+
}
44+
45+
expect(1)
46+
yield()
47+
expect(4)
48+
channel.close()
49+
yield()
50+
finish(7)
51+
}
52+
2253
@Test
2354
fun testAssociate() = runTest {
2455
assertEquals(testList.associate { it * 2 to it * 3 },

0 commit comments

Comments
 (0)