Skip to content

Commit caf528e

Browse files
vkostyukovjenkins
authored and
jenkins
committed
util-core: Improve performance and simplify Var.collects
Problem `Var.collectIndependent` is pretty slow as it has to copy an entire result (array) on every update of every individual var. Solution Use similar technic from Future.collect and avoid intermediate copies of the result by wrapping a mutable array with an immutable interface. Result This also drops higher-kinded API for both Activity's and Var's collect methods. It can't be a generic CC[X] anymore and has to be Seq, just like in Future.collect. JIRA Issues: CSL-11377 Differential Revision: https://phabricator.twitter.biz/D795123
1 parent 86c9f42 commit caf528e

File tree

4 files changed

+43
-69
lines changed

4 files changed

+43
-69
lines changed

CHANGELOG.rst

+7
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ Note that ``PHAB_ID=#`` and ``RB_ID=#`` correspond to associated messages in com
77
Unreleased
88
----------
99

10+
Breaking API Changes
11+
~~~~~~~~~~~~~~~~~~~~
12+
* util-core: `Activity.collect*` and `Var.collect*` are now implemented in terms of known collection
13+
type `scala.collection.Seq` versus HKT `CC[X]` before. This allows for certain performance
14+
enhancements as well as makes it more aligned with the `Future.collect` APIs.
15+
``PHAB_ID=D795123``
16+
1017
21.11.0
1118
-------
1219

util-core/src/main/scala/com/twitter/util/Activity.scala

+16-29
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
package com.twitter.util
22

33
import java.util.{List => JList}
4-
4+
import scala.collection.mutable.ArrayBuffer
55
import scala.collection.mutable.Buffer
66
import scala.jdk.CollectionConverters._
7-
import scala.language.higherKinds
8-
import scala.reflect.ClassTag
97
import scala.util.control.NonFatal
10-
import scala.Iterable
11-
import scala.collection.compat._
8+
import scala.collection.{Seq => AnySeq}
129

1310
/**
1411
* Activity, like [[com.twitter.util.Var Var]], is a value that varies over
@@ -195,11 +192,7 @@ object Activity {
195192
*
196193
* @example def collect[T](activities: Coll[Activity[T]]): Activity[Coll[T]]
197194
*/
198-
def collect[T: ClassTag, CC[X] <: Iterable[X]](
199-
acts: CC[Activity[T]]
200-
)(
201-
implicit factory: Factory[T, CC[T]]
202-
): Activity[CC[T]] = {
195+
def collect[T](acts: AnySeq[Activity[T]]): Activity[Seq[T]] = {
203196
collect(acts, false)
204197
}
205198

@@ -213,32 +206,26 @@ object Activity {
213206
*
214207
* @example def collectIndependent[T](activities: Coll[Activity[T]]): Activity[Coll[T]]
215208
*/
216-
def collectIndependent[T: ClassTag, CC[X] <: Iterable[X]](
217-
acts: CC[Activity[T]]
218-
)(
219-
implicit factory: Factory[T, CC[T]]
220-
): Activity[CC[T]] = {
209+
def collectIndependent[T](acts: AnySeq[Activity[T]]): Activity[Seq[T]] = {
221210
collect(acts, true)
222211
}
223212

224-
private[this] def collect[T: ClassTag, CC[X] <: Iterable[X]](
225-
acts: CC[Activity[T]],
213+
private[this] def collect[T](
214+
acts: AnySeq[Activity[T]],
226215
collectIndependent: Boolean
227-
)(
228-
implicit factory: Factory[T, CC[T]]
229-
): Activity[CC[T]] = {
216+
): Activity[Seq[T]] = {
230217
if (acts.isEmpty)
231-
return Activity.value(factory.fromSpecific(Nil))
218+
return Activity.value(Seq.empty[T])
232219

233-
val states: Iterable[Var[State[T]]] = acts.map(_.run)
234-
val stateVar: Var[Iterable[State[T]]] = if (collectIndependent) {
220+
val states: AnySeq[Var[State[T]]] = acts.map(_.run)
221+
val stateVar: Var[Seq[State[T]]] = if (collectIndependent) {
235222
Var.collectIndependent(states)
236223
} else {
237224
Var.collect(states)
238225
}
239226

240-
def flip(states: Iterable[State[T]]): State[CC[T]] = {
241-
val notOk = states find {
227+
def flip(states: AnySeq[State[T]]): State[Seq[T]] = {
228+
val notOk = states.find {
242229
case Pending | Failed(_) => true
243230
case Ok(_) => false
244231
}
@@ -250,16 +237,16 @@ object Activity {
250237
case Some(_) => assert(false)
251238
}
252239

253-
val ts = factory.newBuilder
254-
states foreach {
240+
val ts = new ArrayBuffer[T](states.size)
241+
states.foreach {
255242
case Ok(t) => ts += t
256243
case _ => assert(false)
257244
}
258245

259-
Ok(ts.result())
246+
Ok(ts.toSeq)
260247
}
261248

262-
Activity(stateVar map flip)
249+
Activity(stateVar.map(flip))
263250
}
264251

265252
/**

util-core/src/main/scala/com/twitter/util/Var.scala

+20-23
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@ import java.util.concurrent.atomic.AtomicLong
44
import java.util.concurrent.atomic.AtomicReference
55
import java.util.{List => JList}
66
import scala.annotation.tailrec
7-
import scala.collection.compat._
7+
import scala.collection.{Seq => AnySeq}
8+
import scala.collection.compat.immutable.ArraySeq
89
import scala.collection.immutable
910
import scala.collection.mutable.ArrayBuffer
1011
import scala.collection.mutable.Buffer
1112
import scala.jdk.CollectionConverters._
1213
import scala.language.higherKinds
13-
import scala.reflect.ClassTag
14-
import scala.Iterable
1514

1615
/**
1716
* Vars are values that vary over time. To create one, you must give it an
@@ -311,12 +310,8 @@ object Var {
311310
* // refCollectIndependent == Vector((1,2), (2,2), (2,4))
312311
* }}}
313312
*/
314-
def collect[T: ClassTag, CC[X] <: Iterable[X]](
315-
vars: CC[Var[T]]
316-
)(
317-
implicit factory: Factory[T, CC[T]]
318-
): Var[CC[T]] = {
319-
val vs = vars.toArray
313+
def collect[T](vars: AnySeq[Var[T]]): Var[Seq[T]] = {
314+
val vs = vars.toIndexedSeq
320315

321316
def tree(begin: Int, end: Int): Var[Seq[T]] =
322317
if (begin == end) Var(Seq.empty)
@@ -330,7 +325,7 @@ object Var {
330325
} yield left ++ right
331326
}
332327

333-
tree(0, vs.length).map(_.to(factory))
328+
tree(0, vs.length)
334329
}
335330

336331
/**
@@ -354,12 +349,8 @@ object Var {
354349
* // refCollectIndependent == Vector((1,2), (2,2), (2,4))
355350
* }}}
356351
*/
357-
def collectIndependent[T: ClassTag, CC[X] <: Iterable[X]](
358-
vars: CC[Var[T]]
359-
)(
360-
implicit factory: Factory[T, CC[T]]
361-
): Var[CC[T]] =
362-
async(factory.newBuilder.result()) { u =>
352+
def collectIndependent[T](vars: AnySeq[Var[T]]): Var[Seq[T]] =
353+
async(Seq.empty[T]) { u =>
363354
val N = vars.size
364355

365356
// `filling` represents whether or not we have gone through our collection
@@ -377,19 +368,25 @@ object Var {
377368
// @note "filling" only works with the guarantee that the initial `observe` is
378369
// synchronous. This should be the case with Vars since they have an initial value.
379370
@volatile var filling = true
380-
val cur = new Array[T](N)
371+
val cur = new Array[Any](N)
381372

382-
def publish(i: Int, newValue: T): Unit = cur.synchronized {
383-
cur(i) = newValue
384-
if (!filling) u() = cur.to(factory)
373+
def publishAt(i: Int): T => Unit = { newValue =>
374+
cur.synchronized {
375+
cur(i) = newValue
376+
if (!filling) {
377+
u() = ArraySeq.unsafeWrapArray(cur).asInstanceOf[ArraySeq[T]]
378+
}
379+
}
385380
}
386381

387382
val closables = new ArrayBuffer[Closable](N)
388-
val iter = vars.iterator.zipWithIndex
383+
var i = 0
384+
val iter = vars.iterator
389385
while (iter.hasNext) {
390-
val (v, i) = iter.next()
386+
val v = iter.next()
391387
if (i == N - 1) filling = false
392-
closables += v.observe(newValue => publish(i, newValue))
388+
closables += v.observe(publishAt(i))
389+
i += 1
393390
}
394391

395392
Closable.all(closables.toSeq: _*)

util-core/src/test/scala/com/twitter/util/VarTest.scala

-17
Original file line numberDiff line numberDiff line change
@@ -243,23 +243,6 @@ class VarTest extends AnyFunSuite with ScalaCheckDrivenPropertyChecks {
243243
}
244244
}
245245

246-
// This is either very neat or very horrendous,
247-
// depending on your point of view.
248-
test("Var.collect[Set]") {
249-
val vars = Seq(Var(1), Var(2), Var(3))
250-
251-
val coll = Var.collect(vars.map { v => v: Var[Int] }.toSet)
252-
val ref = new AtomicReference[Set[Int]]
253-
coll.changes.register(Witness(ref))
254-
assert(ref.get == Set(1, 2, 3))
255-
256-
vars(1).update(1)
257-
assert(ref.get == Set(1, 3))
258-
259-
vars(1).update(999)
260-
assert(ref.get == Set(1, 999, 3))
261-
}
262-
263246
test("Var.collect: ordering") {
264247
val v1 = Var(1)
265248
val v2 = v1.map(_ * 2)

0 commit comments

Comments
 (0)