From ba4976a8168b7a9537c5fab7dba66e38343be026 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 25 Apr 2023 20:24:42 +0200 Subject: [PATCH 1/2] Fix MutexCancellationStressTest flakiness --- .../jvm/test/MutexCancellationStressTest.kt | 109 ++++++++++-------- 1 file changed, 60 insertions(+), 49 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/test/MutexCancellationStressTest.kt b/kotlinx-coroutines-core/jvm/test/MutexCancellationStressTest.kt index eb6360dac0..7f036ed3ae 100644 --- a/kotlinx-coroutines-core/jvm/test/MutexCancellationStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/MutexCancellationStressTest.kt @@ -8,69 +8,80 @@ import kotlinx.coroutines.internal.* import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import org.junit.* +import org.junit.Test import java.util.concurrent.* +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.* class MutexCancellationStressTest : TestBase() { @Test - fun testStressCancellationDoesNotBreakMutex() = runTest { - val mutex = Mutex() - val mutexJobNumber = 3 - val mutexOwners = Array(mutexJobNumber) { "$it" } - val dispatcher = Executors.newFixedThreadPool(mutexJobNumber + 2).asCoroutineDispatcher() - var counter = 0 - val counterLocal = Array(mutexJobNumber) { LocalAtomicInt(0) } - val completed = LocalAtomicInt(0) - val mutexJobLauncher: (jobNumber: Int) -> Job = { jobId -> - val coroutineName = "MutexJob-$jobId" - launch(dispatcher + CoroutineName(coroutineName)) { - while (completed.value == 0) { - mutex.holdsLock(mutexOwners[(jobId + 1) % mutexJobNumber]) - if (mutex.tryLock(mutexOwners[jobId])) { - counterLocal[jobId].incrementAndGet() - counter++ - mutex.unlock(mutexOwners[jobId]) - } - mutex.withLock(mutexOwners[jobId]) { - counterLocal[jobId].incrementAndGet() - counter++ - } - select { - mutex.onLock(mutexOwners[jobId]) { + fun testStressCancellationDoesNotBreakMutex() { + runTest { + val mutex = Mutex() + val mutexJobNumber = 3 + val mutexOwners = Array(mutexJobNumber) { "$it" } + val dispatcher = Executors.newFixedThreadPool(mutexJobNumber + 2).asCoroutineDispatcher() + var counter = 0 + val counterLocal = Array(mutexJobNumber) { AtomicInteger(0) } + val completed = AtomicBoolean(false) + val mutexJobLauncher: (jobNumber: Int) -> Job = { jobId -> + val coroutineName = "MutexJob-$jobId" + // ATOMIC to always have a chance to proceed + launch(dispatcher + CoroutineName(coroutineName), CoroutineStart.ATOMIC) { + while (!completed.get()) { + // Stress out holdsLock + mutex.holdsLock(mutexOwners[(jobId + 1) % mutexJobNumber]) + // Stress out lock-like primitives + if (mutex.tryLock(mutexOwners[jobId])) { counterLocal[jobId].incrementAndGet() counter++ mutex.unlock(mutexOwners[jobId]) } + mutex.withLock(mutexOwners[jobId]) { + counterLocal[jobId].incrementAndGet() + counter++ + } + select { + mutex.onLock(mutexOwners[jobId]) { + counterLocal[jobId].incrementAndGet() + counter++ + mutex.unlock(mutexOwners[jobId]) + } + } } } } - } - val mutexJobs = (0 until mutexJobNumber).map { mutexJobLauncher(it) }.toMutableList() - val checkProgressJob = launch(dispatcher + CoroutineName("checkProgressJob")) { - var lastCounterLocalSnapshot = (0 until mutexJobNumber).map { 0 } - while (completed.value == 0) { - delay(1000) - val c = counterLocal.map { it.value } - for (i in 0 until mutexJobNumber) { - assert(c[i] > lastCounterLocalSnapshot[i]) { "No progress in MutexJob-$i" } + val mutexJobs = (0 until mutexJobNumber).map { mutexJobLauncher(it) }.toMutableList() + val checkProgressJob = launch(dispatcher + CoroutineName("checkProgressJob")) { + var lastCounterLocalSnapshot = (0 until mutexJobNumber).map { 0 } + while (!completed.get()) { + delay(500) + // If we've caught the completion after delay, then there is a chance no progress were made whatsoever, bail out + if (completed.get()) return@launch + val c = counterLocal.map { it.value } + for (i in 0 until mutexJobNumber) { + assert(c[i] > lastCounterLocalSnapshot[i]) { "No progress in MutexJob-$i, last observed state: ${c[i]}" } + } + lastCounterLocalSnapshot = c } - lastCounterLocalSnapshot = c } - } - val cancellationJob = launch(dispatcher + CoroutineName("cancellationJob")) { - var cancellingJobId = 0 - while (completed.value == 0) { - val jobToCancel = mutexJobs.removeFirst() - jobToCancel.cancelAndJoin() - mutexJobs += mutexJobLauncher(cancellingJobId) - cancellingJobId = (cancellingJobId + 1) % mutexJobNumber + val cancellationJob = launch(dispatcher + CoroutineName("cancellationJob")) { + var cancellingJobId = 0 + while (!completed.get()) { + val jobToCancel = mutexJobs.removeFirst() + jobToCancel.cancelAndJoin() + mutexJobs += mutexJobLauncher(cancellingJobId) + cancellingJobId = (cancellingJobId + 1) % mutexJobNumber + } } + delay(2000L * stressTestMultiplier) + completed.set(true) + cancellationJob.join() + mutexJobs.forEach { it.join() } + checkProgressJob.join() + assertEquals(counter, counterLocal.sumOf { it.value }) + dispatcher.close() } - delay(2000L * stressTestMultiplier) - completed.value = 1 - cancellationJob.join() - mutexJobs.forEach { it.join() } - checkProgressJob.join() - check(counter == counterLocal.sumOf { it.value }) - dispatcher.close() } } From 715a04ae269c5ae0292626aa7165269dc1cb1860 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 26 Apr 2023 17:33:44 +0200 Subject: [PATCH 2/2] ~restore diff/after-squash history --- .../jvm/test/MutexCancellationStressTest.kt | 110 +++++++++--------- 1 file changed, 54 insertions(+), 56 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/test/MutexCancellationStressTest.kt b/kotlinx-coroutines-core/jvm/test/MutexCancellationStressTest.kt index 7f036ed3ae..20798b837d 100644 --- a/kotlinx-coroutines-core/jvm/test/MutexCancellationStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/MutexCancellationStressTest.kt @@ -16,72 +16,70 @@ import kotlin.test.* class MutexCancellationStressTest : TestBase() { @Test - fun testStressCancellationDoesNotBreakMutex() { - runTest { - val mutex = Mutex() - val mutexJobNumber = 3 - val mutexOwners = Array(mutexJobNumber) { "$it" } - val dispatcher = Executors.newFixedThreadPool(mutexJobNumber + 2).asCoroutineDispatcher() - var counter = 0 - val counterLocal = Array(mutexJobNumber) { AtomicInteger(0) } - val completed = AtomicBoolean(false) - val mutexJobLauncher: (jobNumber: Int) -> Job = { jobId -> - val coroutineName = "MutexJob-$jobId" - // ATOMIC to always have a chance to proceed - launch(dispatcher + CoroutineName(coroutineName), CoroutineStart.ATOMIC) { - while (!completed.get()) { - // Stress out holdsLock - mutex.holdsLock(mutexOwners[(jobId + 1) % mutexJobNumber]) - // Stress out lock-like primitives - if (mutex.tryLock(mutexOwners[jobId])) { + fun testStressCancellationDoesNotBreakMutex() = runTest { + val mutex = Mutex() + val mutexJobNumber = 3 + val mutexOwners = Array(mutexJobNumber) { "$it" } + val dispatcher = Executors.newFixedThreadPool(mutexJobNumber + 2).asCoroutineDispatcher() + var counter = 0 + val counterLocal = Array(mutexJobNumber) { AtomicInteger(0) } + val completed = AtomicBoolean(false) + val mutexJobLauncher: (jobNumber: Int) -> Job = { jobId -> + val coroutineName = "MutexJob-$jobId" + // ATOMIC to always have a chance to proceed + launch(dispatcher + CoroutineName(coroutineName), CoroutineStart.ATOMIC) { + while (!completed.get()) { + // Stress out holdsLock + mutex.holdsLock(mutexOwners[(jobId + 1) % mutexJobNumber]) + // Stress out lock-like primitives + if (mutex.tryLock(mutexOwners[jobId])) { + counterLocal[jobId].incrementAndGet() + counter++ + mutex.unlock(mutexOwners[jobId]) + } + mutex.withLock(mutexOwners[jobId]) { + counterLocal[jobId].incrementAndGet() + counter++ + } + select { + mutex.onLock(mutexOwners[jobId]) { counterLocal[jobId].incrementAndGet() counter++ mutex.unlock(mutexOwners[jobId]) } - mutex.withLock(mutexOwners[jobId]) { - counterLocal[jobId].incrementAndGet() - counter++ - } - select { - mutex.onLock(mutexOwners[jobId]) { - counterLocal[jobId].incrementAndGet() - counter++ - mutex.unlock(mutexOwners[jobId]) - } - } } } } - val mutexJobs = (0 until mutexJobNumber).map { mutexJobLauncher(it) }.toMutableList() - val checkProgressJob = launch(dispatcher + CoroutineName("checkProgressJob")) { - var lastCounterLocalSnapshot = (0 until mutexJobNumber).map { 0 } - while (!completed.get()) { - delay(500) - // If we've caught the completion after delay, then there is a chance no progress were made whatsoever, bail out - if (completed.get()) return@launch - val c = counterLocal.map { it.value } - for (i in 0 until mutexJobNumber) { - assert(c[i] > lastCounterLocalSnapshot[i]) { "No progress in MutexJob-$i, last observed state: ${c[i]}" } - } - lastCounterLocalSnapshot = c + } + val mutexJobs = (0 until mutexJobNumber).map { mutexJobLauncher(it) }.toMutableList() + val checkProgressJob = launch(dispatcher + CoroutineName("checkProgressJob")) { + var lastCounterLocalSnapshot = (0 until mutexJobNumber).map { 0 } + while (!completed.get()) { + delay(500) + // If we've caught the completion after delay, then there is a chance no progress were made whatsoever, bail out + if (completed.get()) return@launch + val c = counterLocal.map { it.value } + for (i in 0 until mutexJobNumber) { + assert(c[i] > lastCounterLocalSnapshot[i]) { "No progress in MutexJob-$i, last observed state: ${c[i]}" } } + lastCounterLocalSnapshot = c } - val cancellationJob = launch(dispatcher + CoroutineName("cancellationJob")) { - var cancellingJobId = 0 - while (!completed.get()) { - val jobToCancel = mutexJobs.removeFirst() - jobToCancel.cancelAndJoin() - mutexJobs += mutexJobLauncher(cancellingJobId) - cancellingJobId = (cancellingJobId + 1) % mutexJobNumber - } + } + val cancellationJob = launch(dispatcher + CoroutineName("cancellationJob")) { + var cancellingJobId = 0 + while (!completed.get()) { + val jobToCancel = mutexJobs.removeFirst() + jobToCancel.cancelAndJoin() + mutexJobs += mutexJobLauncher(cancellingJobId) + cancellingJobId = (cancellingJobId + 1) % mutexJobNumber } - delay(2000L * stressTestMultiplier) - completed.set(true) - cancellationJob.join() - mutexJobs.forEach { it.join() } - checkProgressJob.join() - assertEquals(counter, counterLocal.sumOf { it.value }) - dispatcher.close() } + delay(2000L * stressTestMultiplier) + completed.set(true) + cancellationJob.join() + mutexJobs.forEach { it.join() } + checkProgressJob.join() + assertEquals(counter, counterLocal.sumOf { it.value }) + dispatcher.close() } }