Skip to content

Commit cbd5b1c

Browse files
authored
Merge pull request #1652 from Kotlin/scheduler-changes
CoroutineScheduler rework Fixes #840 Fixes #1046 Fixes #1286
2 parents e60ec8e + 4224e01 commit cbd5b1c

31 files changed

+1003
-1042
lines changed

benchmarks/src/jmh/kotlin/benchmarks/ChannelProducerConsumerBenchmark.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
15
package benchmarks
26

37
import kotlinx.coroutines.*

benchmarks/src/jmh/kotlin/benchmarks/ParametrizedDispatcherBase.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
package benchmarks
66

7-
import benchmarks.actors.CORES_COUNT
7+
import benchmarks.akka.CORES_COUNT
88
import kotlinx.coroutines.*
99
import kotlinx.coroutines.scheduling.*
1010
import org.openjdk.jmh.annotations.Param
@@ -22,14 +22,14 @@ abstract class ParametrizedDispatcherBase : CoroutineScope {
2222

2323
abstract var dispatcher: String
2424
override lateinit var coroutineContext: CoroutineContext
25-
var closeable: Closeable? = null
25+
private var closeable: Closeable? = null
2626

27-
@UseExperimental(InternalCoroutinesApi::class)
2827
@Setup
28+
@UseExperimental(InternalCoroutinesApi::class)
2929
open fun setup() {
3030
coroutineContext = when {
3131
dispatcher == "fjp" -> ForkJoinPool.commonPool().asCoroutineDispatcher()
32-
dispatcher == "experimental" -> {
32+
dispatcher == "scheduler" -> {
3333
ExperimentalCoroutineDispatcher(CORES_COUNT).also { closeable = it }
3434
}
3535
dispatcher.startsWith("ftp") -> {

benchmarks/src/jmh/kotlin/benchmarks/actors/PingPongAkkaBenchmark.kt renamed to benchmarks/src/jmh/kotlin/benchmarks/akka/PingPongAkkaBenchmark.kt

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5-
package benchmarks.actors
5+
package benchmarks.akka
66

77
import akka.actor.ActorRef
88
import akka.actor.ActorSystem
@@ -13,7 +13,6 @@ import org.openjdk.jmh.annotations.*
1313
import scala.concurrent.Await
1414
import scala.concurrent.duration.Duration
1515
import java.util.concurrent.CountDownLatch
16-
import java.util.concurrent.TimeUnit
1716

1817
const val N_MESSAGES = 100_000
1918

@@ -29,12 +28,12 @@ class Stop
2928
* PingPongAkkaBenchmark.singlePingPong default-dispatcher avgt 10 173.742 ± 41.984 ms/op
3029
* PingPongAkkaBenchmark.singlePingPong single-thread-dispatcher avgt 10 24.181 ± 0.730 ms/op
3130
*/
32-
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
33-
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
34-
@Fork(value = 2)
35-
@BenchmarkMode(Mode.AverageTime)
36-
@OutputTimeUnit(TimeUnit.MILLISECONDS)
37-
@State(Scope.Benchmark)
31+
//@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
32+
//@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
33+
//@Fork(value = 2)
34+
//@BenchmarkMode(Mode.AverageTime)
35+
//@OutputTimeUnit(TimeUnit.MILLISECONDS)
36+
//@State(Scope.Benchmark)
3837
open class PingPongAkkaBenchmark {
3938

4039
lateinit var system: ActorSystem
@@ -62,12 +61,12 @@ open class PingPongAkkaBenchmark {
6261
Await.ready(system.terminate(), Duration.Inf())
6362
}
6463

65-
@Benchmark
64+
// @Benchmark
6665
fun singlePingPong() {
6766
runPingPongs(1)
6867
}
6968

70-
@Benchmark
69+
// @Benchmark
7170
fun coresCountPingPongs() {
7271
runPingPongs(Runtime.getRuntime().availableProcessors())
7372
}

benchmarks/src/jmh/kotlin/benchmarks/actors/StatefulActorAkkaBenchmark.kt renamed to benchmarks/src/jmh/kotlin/benchmarks/akka/StatefulActorAkkaBenchmark.kt

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5-
package benchmarks.actors
5+
package benchmarks.akka
66

77
import akka.actor.ActorRef
88
import akka.actor.ActorSystem
@@ -14,7 +14,6 @@ import scala.concurrent.Await
1414
import scala.concurrent.duration.Duration
1515
import java.util.concurrent.CountDownLatch
1616
import java.util.concurrent.ThreadLocalRandom
17-
import java.util.concurrent.TimeUnit
1817

1918
const val ROUNDS = 10_000
2019
const val STATE_SIZE = 1024
@@ -38,12 +37,12 @@ val CORES_COUNT = Runtime.getRuntime().availableProcessors()
3837
* StatefulActorAkkaBenchmark.singleComputationSingleRequestor default-dispatcher avgt 14 39.964 ± 2.343 ms/op
3938
* StatefulActorAkkaBenchmark.singleComputationSingleRequestor single-thread-dispatcher avgt 14 10.214 ± 2.152 ms/op
4039
*/
41-
@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
42-
@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
43-
@Fork(value = 2)
44-
@BenchmarkMode(Mode.AverageTime)
45-
@OutputTimeUnit(TimeUnit.MILLISECONDS)
46-
@State(Scope.Benchmark)
40+
//@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
41+
//@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
42+
//@Fork(value = 2)
43+
//@BenchmarkMode(Mode.AverageTime)
44+
//@OutputTimeUnit(TimeUnit.MILLISECONDS)
45+
//@State(Scope.Benchmark)
4746
open class StatefulActorAkkaBenchmark {
4847

4948
lateinit var system: ActorSystem
@@ -72,22 +71,22 @@ open class StatefulActorAkkaBenchmark {
7271
Await.ready(system.terminate(), Duration.Inf())
7372
}
7473

75-
@Benchmark
74+
// @Benchmark
7675
fun singleComputationSingleRequestor() {
7776
run(1, 1)
7877
}
7978

80-
@Benchmark
79+
// @Benchmark
8180
fun singleComputationMultipleRequestors() {
8281
run(1, CORES_COUNT)
8382
}
8483

85-
@Benchmark
84+
// @Benchmark
8685
fun multipleComputationsSingleRequestor() {
8786
run(CORES_COUNT, 1)
8887
}
8988

90-
@Benchmark
89+
// @Benchmark
9190
fun multipleComputationsMultipleRequestors() {
9291
run(CORES_COUNT, CORES_COUNT)
9392
}
@@ -120,7 +119,8 @@ open class StatefulActorAkkaBenchmark {
120119

121120
private fun createComputationActors(initLatch: CountDownLatch, count: Int): List<ActorRef> {
122121
return (0 until count).map {
123-
system.actorOf(Props.create(ComputationActor::class.java,
122+
system.actorOf(Props.create(
123+
ComputationActor::class.java,
124124
LongArray(STATE_SIZE) { ThreadLocalRandom.current().nextLong(0, 100) }, initLatch)
125125
.withDispatcher("akka.actor.$dispatcher"))
126126
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package benchmarks.scheduler
6+
7+
import benchmarks.akka.*
8+
import kotlinx.coroutines.*
9+
import org.openjdk.jmh.annotations.*
10+
import org.openjdk.jmh.annotations.State
11+
import java.lang.Thread.*
12+
import java.util.concurrent.*
13+
import kotlin.concurrent.*
14+
import kotlin.coroutines.*
15+
16+
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
17+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
18+
@Fork(value = 1)
19+
@BenchmarkMode(Mode.AverageTime)
20+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
21+
@State(Scope.Thread)
22+
open class DispatchersContextSwitchBenchmark {
23+
private val nCoroutines = 10000
24+
private val delayTimeMs = 1L
25+
private val nRepeatDelay = 10
26+
27+
private val fjp = ForkJoinPool.commonPool().asCoroutineDispatcher()
28+
private val ftp = Executors.newFixedThreadPool(CORES_COUNT - 1).asCoroutineDispatcher()
29+
30+
@TearDown
31+
fun teardown() {
32+
ftp.close()
33+
(ftp.executor as ExecutorService).awaitTermination(1, TimeUnit.SECONDS)
34+
}
35+
36+
@Benchmark
37+
fun coroutinesIoDispatcher() = runBenchmark(Dispatchers.IO)
38+
39+
@Benchmark
40+
fun coroutinesDefaultDispatcher() = runBenchmark(Dispatchers.Default)
41+
42+
@Benchmark
43+
fun coroutinesFjpDispatcher() = runBenchmark(fjp)
44+
45+
@Benchmark
46+
fun coroutinesFtpDispatcher() = runBenchmark(ftp)
47+
48+
@Benchmark
49+
fun coroutinesBlockingDispatcher() = runBenchmark(EmptyCoroutineContext)
50+
51+
@Benchmark
52+
fun threads() {
53+
val threads = List(nCoroutines) {
54+
thread(start = true) {
55+
repeat(nRepeatDelay) {
56+
sleep(delayTimeMs)
57+
}
58+
}
59+
}
60+
threads.forEach { it.join() }
61+
}
62+
63+
private fun runBenchmark(dispatcher: CoroutineContext) = runBlocking {
64+
repeat(nCoroutines) {
65+
launch(dispatcher) {
66+
repeat(nRepeatDelay) {
67+
delay(delayTimeMs)
68+
}
69+
}
70+
}
71+
}
72+
}
73+

benchmarks/src/jmh/kotlin/benchmarks/ForkJoinBenchmark.kt renamed to benchmarks/src/jmh/kotlin/benchmarks/scheduler/ForkJoinBenchmark.kt

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5-
package benchmarks
5+
package benchmarks.scheduler
66

7+
import benchmarks.*
78
import kotlinx.coroutines.*
89
import org.openjdk.jmh.annotations.*
910
import java.util.concurrent.*
@@ -44,7 +45,7 @@ open class ForkJoinBenchmark : ParametrizedDispatcherBase() {
4445
}
4546

4647
lateinit var coefficients: LongArray
47-
override var dispatcher: String = "experimental"
48+
override var dispatcher: String = "scheduler"
4849

4950
@Setup
5051
override fun setup() {
@@ -129,8 +130,18 @@ open class ForkJoinBenchmark : ParametrizedDispatcherBase() {
129130
} else {
130131
pendingCount = 2
131132
// One may fork only once here and executing second task here with looping over firstComplete to be even more efficient
132-
first = RecursiveAction(coefficients, start, start + (end - start) / 2, parent = this).fork()
133-
second = RecursiveAction(coefficients, start + (end - start) / 2, end, parent = this).fork()
133+
first = RecursiveAction(
134+
coefficients,
135+
start,
136+
start + (end - start) / 2,
137+
parent = this
138+
).fork()
139+
second = RecursiveAction(
140+
coefficients,
141+
start + (end - start) / 2,
142+
end,
143+
parent = this
144+
).fork()
134145
}
135146

136147
tryComplete()

benchmarks/src/jmh/kotlin/benchmarks/LaunchBenchmark.kt renamed to benchmarks/src/jmh/kotlin/benchmarks/scheduler/LaunchBenchmark.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5-
package benchmarks
5+
package benchmarks.scheduler
66

7+
import benchmarks.*
78
import kotlinx.coroutines.*
89
import org.openjdk.jmh.annotations.*
910
import java.util.concurrent.*
@@ -21,7 +22,7 @@ import java.util.concurrent.*
2122
@State(Scope.Benchmark)
2223
open class LaunchBenchmark : ParametrizedDispatcherBase() {
2324

24-
@Param("experimental", "fjp")
25+
@Param("scheduler", "fjp")
2526
override var dispatcher: String = "fjp"
2627

2728
private val jobsToLaunch = 100

benchmarks/src/jmh/kotlin/benchmarks/StatefulAwaitsBenchmark.kt renamed to benchmarks/src/jmh/kotlin/benchmarks/scheduler/StatefulAwaitsBenchmark.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5-
package benchmarks
5+
package benchmarks.scheduler
66

7+
import benchmarks.*
78
import kotlinx.coroutines.*
89
import kotlinx.coroutines.channels.*
910
import org.openjdk.jmh.annotations.*
@@ -52,7 +53,7 @@ open class StatefulAsyncBenchmark : ParametrizedDispatcherBase() {
5253
@Param("1", "8", "16")
5354
var jobsCount = 1
5455

55-
@Param("fjp", "ftp_1", "ftp_8")
56+
@Param("fjp", "ftp_1", "dispatcher")
5657
override var dispatcher: String = "fjp"
5758

5859
@Volatile

benchmarks/src/jmh/kotlin/benchmarks/actors/ConcurrentStatefulActorBenchmark.kt renamed to benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/ConcurrentStatefulActorBenchmark.kt

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5-
package benchmarks.actors
5+
package benchmarks.scheduler.actors
66

77
import benchmarks.*
8-
import benchmarks.actors.StatefulActorBenchmark.*
8+
import benchmarks.akka.*
9+
import benchmarks.scheduler.actors.StatefulActorBenchmark.*
910
import kotlinx.coroutines.*
1011
import kotlinx.coroutines.channels.*
1112
import org.openjdk.jmh.annotations.*
@@ -57,18 +58,18 @@ import java.util.concurrent.*
5758
@State(Scope.Benchmark)
5859
open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() {
5960

60-
@Param("1024", "8192", "262144")
61+
@Param("1024", "8192")
6162
var stateSize: Int = -1
6263

63-
@Param("fjp", "ftp_1", "ftp_8", "experimental")
64+
@Param("fjp", "scheduler")
6465
override var dispatcher: String = "fjp"
6566

6667
@Benchmark
6768
fun multipleComputationsUnfair() = runBlocking {
6869
val resultChannel: Channel<Unit> = Channel(1)
6970
val computations = (0 until CORES_COUNT).map { computationActor(stateSize) }
7071
val requestor = requestorActorUnfair(computations, resultChannel)
71-
requestor.send(Letter(Start(), Channel(0)))
72+
requestor.send(Letter(Start(), requestor))
7273
resultChannel.receive()
7374
}
7475

@@ -77,7 +78,7 @@ open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() {
7778
val resultChannel: Channel<Unit> = Channel(1)
7879
val computations = (0 until CORES_COUNT).map { computationActor(stateSize) }
7980
val requestor = requestorActorFair(computations, resultChannel)
80-
requestor.send(Letter(Start(), Channel(0)))
81+
requestor.send(Letter(Start(), requestor))
8182
resultChannel.receive()
8283
}
8384

@@ -95,6 +96,7 @@ open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() {
9596
}
9697
is Long -> {
9798
if (++received >= ROUNDS * 8) {
99+
computations.forEach { it.close() }
98100
stopChannel.send(Unit)
99101
return@actor
100102
} else {
@@ -122,6 +124,7 @@ open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() {
122124
}
123125
is Long -> {
124126
if (++receivedTotal >= ROUNDS * computations.size) {
127+
computations.forEach { it.close() }
125128
stopChannel.send(Unit)
126129
return@actor
127130
} else {
@@ -136,4 +139,4 @@ open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() {
136139
}
137140
}
138141
}
139-
}
142+
}

0 commit comments

Comments
 (0)