This repository was archived by the owner on Apr 20, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathreduce.js
73 lines (65 loc) · 2.48 KB
/
reduce.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
var ReduceObservable = (function(__super__) {
inherits(ReduceObservable, __super__);
function ReduceObservable(source, accumulator, hasSeed, seed) {
this.source = source;
this.accumulator = accumulator;
this.hasSeed = hasSeed;
this.seed = seed;
__super__.call(this);
}
ReduceObservable.prototype.subscribeCore = function(observer) {
return this.source.subscribe(new ReduceObserver(observer,this));
};
return ReduceObservable;
}(ObservableBase));
var ReduceObserver = (function (__super__) {
inherits(ReduceObserver, __super__);
function ReduceObserver(o, parent) {
this._o = o;
this._p = parent;
this._fn = parent.accumulator;
this._hs = parent.hasSeed;
this._s = parent.seed;
this._ha = false;
this._a = null;
this._hv = false;
this._i = 0;
__super__.call(this);
}
ReduceObserver.prototype.next = function (x) {
!this._hv && (this._hv = true);
if (this._ha) {
this._a = tryCatch(this._fn)(this._a, x, this._i, this._p);
} else {
this._a = this._hs ? tryCatch(this._fn)(this._s, x, this._i, this._p) : x;
this._ha = true;
}
if (this._a === errorObj) { return this._o.onError(this._a.e); }
this._i++;
};
ReduceObserver.prototype.error = function (e) {
this._o.onError(e);
};
ReduceObserver.prototype.completed = function () {
this._hv && this._o.onNext(this._a);
!this._hv && this._hs && this._o.onNext(this._s);
!this._hv && !this._hs && this._o.onError(new EmptyError());
this._o.onCompleted();
};
return ReduceObserver;
}(AbstractObserver));
/**
* Applies an accumulator function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value.
* For aggregation behavior with incremental intermediate results, see Observable.scan.
* @param {Function} accumulator An accumulator function to be invoked on each element.
* @param {Any} [seed] The initial accumulator value.
* @returns {Observable} An observable sequence containing a single element with the final accumulator value.
*/
observableProto.reduce = function () {
var hasSeed = false, seed, accumulator = arguments[0];
if (arguments.length === 2) {
hasSeed = true;
seed = arguments[1];
}
return new ReduceObservable(this, accumulator, hasSeed, seed);
};