-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Mathematical and Aggregate Operators
This section explains operators that perform mathematical or other operations over an entire sequence of items emitted by an Observable. Because these operations must wait for the source Observable to complete emitting items before they can construct their own emissions (and must usually buffer these items), these operators are dangerous to use on Observables that may have very long or infinite sequences.
-
average( )
— calculates the average of Integers emitted by an Observable and emits this average -
averageLongs( )
— calculates the average of Longs emitted by an Observable and emits this average -
averageFloats( )
— calculates the average of Floats emitted by an Observable and emits this average -
averageDoubles( )
— calculates the average of Doubles emitted by an Observable and emits this average -
concat( )
— concatenate two or more Observables sequentially -
count( )
andlongCount( )
— counts the number of items emitted by an Observable and emits this count -
max( )
— emits the maximum value emitted by a source Observable -
maxBy( )
— emits the item emitted by the source Observable that has the maximum key value -
min( )
— emits the minimum value emitted by a source Observable -
minBy( )
— emits the item emitted by the source Observable that has the minimum key value -
reduce( )
— apply a function to each emitted item, sequentially, and emit only the final accumulated value -
sum( )
— adds the Integers emitted by an Observable and emits this sum -
sumLongs( )
— adds the Longs emitted by an Observable and emits this sum -
sumFloats( )
— adds the Floats emitted by an Observable and emits this sum -
sumDoubles( )
— adds the Floats emitted by an Observable and emits this sum
The average( )
method returns an Observable that calculates the average of the Integers emitted by a source Observable and then emits this average as an Integer, as shown in the following sample code:
def myObservable = Observable.create({ anObserver ->
anObserver.onNext(4);
anObserver.onNext(3);
anObserver.onNext(2);
anObserver.onNext(1);
anObserver.onCompleted();
});
myObservable.average().subscribe(
{ println(it); }, // onNext
{ println("Error encountered"); }, // onError
{ println("Sequence complete"); } // onCompleted
);
2
Sequence complete
There are also specialized "average" methods for Longs, Floats, and Doubles (averageLongs( )
, averageFloats( )
, and averageDoubles( )
).
Note that these methods will fail with an IllegalArgumentException
if the source Observable does not emit any items.
- javadoc:
average()
,averageLongs()
,averageFloats()
, andaverageDoubles()
- RxJS:
average
- Linq:
Average
- Introduction to Rx: Min, Max, Sum, and Average
You can concatenate the output of multiple Observables so that they act like a single Observable, with all of the items emitted by the first Observable being emitted before any of the items emitted by the second Observable, by using the concat( )
method:
myConcatenatedObservable = Observable.concat(observable1, observable2, ... );
For example, the following code concatenates the 'odds' and 'evens' Observables into a single Observable:
odds = Observable.from([1, 3, 5, 7]);
evens = Observable.from([2, 4, 6]);
Observable.concat(odds, evens).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
1
3
5
7
2
4
6
Sequence complete
Instead of passing multiple Observables into concat( )
, you could also pass in a List<>
of Observables, or even an Observable that emits Observables, and concat( )
will concatenate their output into the output of a single Observable.
- javadoc:
concat(observable1, observable2, ...)
- RxJS:
concat
- Linq:
Concat
- Introduction to Rx: Concat
The count( )
method returns an Observable that emits a single item: an Integer that represents the total number of items emitted by the source Observable, as shown in the following sample code:
def myObservable = Observable.create({ anObserver ->
anObserver.onNext('Three');
anObserver.onNext('Two');
anObserver.onNext('One');
anObserver.onCompleted();
});
myObservable.count().subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
3
Sequence complete
longCount( )
is essentially the same, but emits its item as a Long rather than an Integer.
- javadoc:
count()
- RxJS:
count
- Linq:
Count
- Introduction to Rx: Count
The max( )
operator waits until the source Observable completes, and then emits the item emitted by the source Observable that had the highest value, before itself completing. If more than one item has this maximum value, max( )
emits the last such item. You may optionally pass in a comparator that max( )
will use to determine the maximum of two emitted items.
- Linq:
Max
- RxJS:
max
- Intro to Rx: Max and Min
The maxBy( )
operator is similar to max( )
but instead of emitting the maximum item emitted by the source Observable, it emits the last item from the source Observable that has the maximum key, where that key is generated by a function applied to each item. You supply this function.
- Linq:
MaxBy
- RxJS:
maxBy
- Intro to Rx: MinBy and MaxBy
The min( )
operator waits until the source Observable completes, and then emits the item emitted by the source Observable that had the lowest value, before itself completing. If more than one item has this minimum value, min( )
emits the last such item. You may optionally pass in a comparator that min( )
will use to determine the minimum of two emitted items.
- Linq:
Min
- RxJS:
min
- Intro to Rx: Max and Min
The minBy( )
operator is similar to min( )
but instead of emitting the minimum item emitted by the source Observable, it emits the last item from the source Observable that has the minimum key, where that key is generated by a function applied to each item. You supply this function.
- Linq:
MinBy
- RxJS:
minBy
- Intro to Rx: MinBy and MaxBy
The reduce( )
method returns an Observable that applies a function of your choosing to the first item emitted by a source Observable, then feeds the result of that function along with the second item emitted by the source Observable into the same function, then feeds the result of that function along with the third item into the same function, and so on until all items have been emitted by the source Observable. Then it emits the final result from the final call to your function as the sole output from the returned Observable.
Note that if the source Observable does not emit any items, reduce( )
will fail with an IllegalArgumentException
.
For example, the following code uses reduce( )
to compute, and then emit as an Observable, the sum of the numbers emitted by the source Observable:
numbers = Observable.from([1, 2, 3, 4, 5]);
numbers.reduce({ a, b -> a+b }).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
15
Sequence complete
This technique, which is called “reduce” in the RxJava context, is sometimes called “aggregate,” “fold,” “accumulate,” “compress,” or “inject” in other programming arenas.
There is also a version of reduce( )
to which you can pass a seed item in addition to an accumulator function:
my_observable.reduce(initial_seed, accumulator_closure)
Note that passing a null
seed is not the same as not passing a seed. The behavior will be different. If you pass a seed of null
, you will be seeding your reduction with the item null
. Note also that if you do pass in a seed, and the source Observable emits no items, reduce
will emit the seed and complete normally without error.
Imagine you have access to an Observable that emits a sequence of "Movie" objects that correspond to the "coming soon" movies from a theater. These objects include a number of items of information about the movie, including its title and opening day. You could use reduce
to convert this sequence of Movie objects into a single list of titles, like this:
getComingSoonSequence()
.reduce([], { theList, video ->
theList.add("'" + video.getTitle() + "' (" + video.getOpen() + ")");
return(theList);
}).subscribe({ println("Coming Soon: " + it) });
Which might result in something like this:
Coming Soon: ['Botso' (Sept. 30), 'The Act of Killing' (Sept. 30), 'Europa Report' (Sept. 27), 'Salinger' (Sept.27), 'In a World' (Sept. 27)]
- javadoc:
reduce(accumulator)
- javadoc:
reduce(initialValue, accumulator)
- RxJS:
aggregate
- Linq:
Aggregate
- Introduction to Rx: Aggregate
The sum( )
method returns an Observable that adds the Integers emitted by a source Observable and then emits this sum as an Integer, as shown in the following sample code:
def myObservable = Observable.create({ anObserver ->
anObserver.onNext(4);
anObserver.onNext(3);
anObserver.onNext(2);
anObserver.onNext(1);
anObserver.onCompleted();
});
myObservable.sum().subscribe(
{ println(it); }, // onNext
{ println("Error encountered"); }, // onError
{ println("Sequence complete"); } // onCompleted
);
10
Sequence complete
There are also specialized "sum" methods for Longs, Floats, and Doubles (sumLongs( )
, sumFloats( )
, and sumDoubles( )
).
- javadoc:
sum()
,sumLongs()
,sumFloats()
, andsumDoubles()
- RxJS:
sum
- Linq:
Sum
- Introduction to Rx: Min, Max, Sum, and Average
Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava | Gitter @RxJava