Skip to content

Commit d02febf

Browse files
committed
Fixes linearizability of Channel.close in advanced receive+send case
We cannot resume closed receives until all receivers are removed from the list. Consider channel state: head -> [receive_1] -> [receive_2] -> head - T1 called receive_2, and will call send() when it's receive call resumes - T2 calls close() Now if T2's close resumes T1's receive_2 then it's receive gets "closed for receive" exception, but its subsequent attempt to send successfully rendezvous with receive_1, producing non-linearizable execution.
1 parent 489c9bb commit d02febf

File tree

1 file changed

+31
-15
lines changed

1 file changed

+31
-15
lines changed

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -287,33 +287,49 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
287287
private fun helpClose(closed: Closed<*>) {
288288
/*
289289
* It's important to traverse list from right to left to avoid races with sender.
290-
* Consider channel state
291-
* head sentinel -> [receiver 1] -> [receiver 2] -> head sentinel
292-
* T1 invokes receive()
293-
* T2 invokes close()
294-
* T3 invokes close() + send(value)
290+
* Consider channel state: head -> [receive_1] -> [receive_2] -> head
291+
* - T1 calls receive()
292+
* - T2 calls close()
293+
* - T3 calls close() + send(value)
295294
*
296295
* If both will traverse list from left to right, following non-linearizable history is possible:
297296
* [close -> false], [send -> transferred 'value' to receiver]
297+
*
298+
* Another problem with linearizability of close is that we cannot resume closed receives until all
299+
* receivers are removed from the list.
300+
* Consider channel state: head -> [receive_1] -> [receive_2] -> head
301+
* - T1 called receive_2, and will call send() when it's receive call resumes
302+
* - T2 calls close()
303+
*
304+
* Now if T2's close resumes T1's receive_2 then it's receive gets "closed for receive" exception, but
305+
* its subsequent attempt to send successfully rendezvous with receive_1, producing non-linearizable execution.
298306
*/
307+
var closedNode: Receive<E>? = null // used when one node was closed to avoid extra memory allocation
308+
var closedList: ArrayList<Receive<E>>? = null // used when more nodes were closed
299309
while (true) {
300-
val previous = closed.prevNode
301-
// Channel is empty or has no receivers
302-
if (previous is LockFreeLinkedListHead || previous !is Receive<*>) {
303-
break
304-
}
305-
310+
// Break when channel is empty or has no receivers
311+
@Suppress("UNCHECKED_CAST")
312+
val previous = closed.prevNode as? Receive<E> ?: break
306313
if (!previous.remove()) {
307314
// failed to remove the node (due to race) -- retry finding non-removed prevNode
308315
// NOTE: remove() DOES NOT help pending remove operation (that marked next pointer)
309316
previous.helpRemove() // make sure remove is complete before continuing
310317
continue
311318
}
312-
313-
@Suppress("UNCHECKED_CAST")
314-
previous as Receive<E> // type assertion
315-
previous.resumeReceiveClosed(closed)
319+
// add removed nodes to a separate list
320+
if (closedNode == null) {
321+
closedNode = previous
322+
} else {
323+
val list = closedList ?: ArrayList<Receive<E>>().also { closedList = it }
324+
list += previous
325+
}
326+
}
327+
// now notify all removed nodes that the channel was closed
328+
if (closedNode != null) {
329+
closedNode.resumeReceiveClosed(closed)
330+
closedList?.forEach { it.resumeReceiveClosed(closed) }
316331
}
332+
// and do other post-processing
317333
onClosedIdempotent(closed)
318334
}
319335

0 commit comments

Comments
 (0)