Skip to content

Commit c39786a

Browse files
hamdiallamjenkins
authored and
jenkins
committed
util/util-core: skip scheduler on empty waitqueue
Problem: Empty waitqueues are submitted to the scheduler. This will happen when a promise with no continuations is satisfied Solution: Skip empty waitqueues Result: Lets also make WaitQueue.Empty an object to make debugging references easier JIRA Issues: CSL-11517 Differential Revision: https://phabricator.twitter.biz/D796346
1 parent 8776049 commit c39786a

File tree

3 files changed

+44
-4
lines changed

3 files changed

+44
-4
lines changed

util-core/src/main/scala/com/twitter/util/Promise.scala

+6-3
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,11 @@ object Promise {
9292
loop(this, WaitQueue.empty)
9393
}
9494

95-
final def runInScheduler(t: Try[A]): Unit =
96-
Scheduler.submit(new Runnable() { def run(): Unit = WaitQueue.this.run(t) })
95+
final def runInScheduler(t: Try[A]): Unit = {
96+
if (this ne WaitQueue.Empty) {
97+
Scheduler.submit(() => WaitQueue.this.run(t))
98+
}
99+
}
97100

98101
@tailrec
99102
private def run(t: Try[A]): Unit =
@@ -107,7 +110,7 @@ object Promise {
107110

108111
private[util] object WaitQueue {
109112

110-
val Empty: WaitQueue[Nothing] = new WaitQueue[Nothing] {
113+
object Empty extends WaitQueue[Nothing] {
111114
final def first: K[Nothing] =
112115
throw new IllegalStateException("WaitQueue.Empty")
113116

util-core/src/test/scala/com/twitter/concurrent/SchedulerTest.scala

-1
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,6 @@ abstract class LocalSchedulerTest(lifo: Boolean) extends AnyFunSuite with Matche
150150
val record = sampleBlockingFraction(0.0)
151151
assert(record == null)
152152
}
153-
154153
}
155154

156155
class LocalSchedulerFifoTest extends LocalSchedulerTest(false)

util-core/src/test/scala/com/twitter/util/PromiseTest.scala

+38
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,20 @@
11
package com.twitter.util
22

3+
import com.twitter.concurrent.Scheduler
4+
import com.twitter.concurrent.LocalScheduler
35
import com.twitter.conversions.DurationOps._
46
import org.scalatest.funsuite.AnyFunSuite
57

68
class PromiseTest extends AnyFunSuite {
79

10+
class TrackingScheduler extends LocalScheduler(false) {
11+
@volatile var submitted: Int = 0
12+
override def submit(r: Runnable): Unit = {
13+
submitted = submitted + 1
14+
super.submit(r)
15+
}
16+
}
17+
818
test("Promise.detach should not detach other attached promises") {
919
val p = new Promise[Unit]
1020
val attached1 = Promise.attached(p)
@@ -305,4 +315,32 @@ class PromiseTest extends AnyFunSuite {
305315
assert("main thread" == Await.result(p2, 3.seconds))
306316
assert("main thread" == Await.result(p, 3.seconds))
307317
}
318+
319+
test("promise with no continuations is not submitted to the scheduler") {
320+
val old = Scheduler()
321+
try {
322+
val sched = new TrackingScheduler()
323+
Scheduler.setUnsafe(sched)
324+
val p = new Promise[Unit]()
325+
p.setDone()
326+
assert(sched.submitted == 0)
327+
} finally {
328+
Scheduler.setUnsafe(old)
329+
}
330+
}
331+
332+
test("registered continuation on satisfied promise is submitted to the scheduler") {
333+
val old = Scheduler()
334+
try {
335+
val sched = new TrackingScheduler()
336+
Scheduler.setUnsafe(sched)
337+
val p = new Promise[Unit]()
338+
p.setDone()
339+
340+
p.ensure(())
341+
assert(sched.submitted == 1)
342+
} finally {
343+
Scheduler.setUnsafe(old)
344+
}
345+
}
308346
}

0 commit comments

Comments
 (0)