This repository was archived by the owner on Apr 20, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathjoin.js
82 lines (72 loc) · 3.33 KB
/
join.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/**
* Correlates the elements of two sequences based on overlapping durations.
*
* @param {Observable} right The right observable sequence to join elements for.
* @param {Function} leftDurationSelector A function to select the duration (expressed as an observable sequence) of each element of the left observable sequence, used to determine overlap.
* @param {Function} rightDurationSelector A function to select the duration (expressed as an observable sequence) of each element of the right observable sequence, used to determine overlap.
* @param {Function} resultSelector A function invoked to compute a result element for any two overlapping elements of the left and right observable sequences. The parameters passed to the function correspond with the elements from the left and right source sequences for which overlap occurs.
* @returns {Observable} An observable sequence that contains result elements computed from source elements that have an overlapping duration.
*/
observableProto.join = function (right, leftDurationSelector, rightDurationSelector, resultSelector) {
var left = this;
return new AnonymousObservable(function (o) {
var group = new CompositeDisposable();
var leftDone = false, rightDone = false;
var leftId = 0, rightId = 0;
var leftMap = new Map(), rightMap = new Map();
var handleError = function (e) { o.onError(e); };
group.add(left.subscribe(
function (value) {
var id = leftId++, md = new SingleAssignmentDisposable();
leftMap.set(id, value);
group.add(md);
var duration = tryCatch(leftDurationSelector)(value);
if (duration === errorObj) { return o.onError(duration.e); }
md.setDisposable(duration.take(1).subscribe(
noop,
handleError,
function () {
leftMap['delete'](id) && leftMap.size === 0 && leftDone && o.onCompleted();
group.remove(md);
}));
rightMap.forEach(function (v) {
var result = tryCatch(resultSelector)(value, v);
if (result === errorObj) { return o.onError(result.e); }
o.onNext(result);
});
},
handleError,
function () {
leftDone = true;
(rightDone || leftMap.size === 0) && o.onCompleted();
})
);
group.add(right.subscribe(
function (value) {
var id = rightId++, md = new SingleAssignmentDisposable();
rightMap.set(id, value);
group.add(md);
var duration = tryCatch(rightDurationSelector)(value);
if (duration === errorObj) { return o.onError(duration.e); }
md.setDisposable(duration.take(1).subscribe(
noop,
handleError,
function () {
rightMap['delete'](id) && rightMap.size === 0 && rightDone && o.onCompleted();
group.remove(md);
}));
leftMap.forEach(function (v) {
var result = tryCatch(resultSelector)(v, value);
if (result === errorObj) { return o.onError(result.e); }
o.onNext(result);
});
},
handleError,
function () {
rightDone = true;
(leftDone || rightMap.size === 0) && o.onCompleted();
})
);
return group;
}, left);
};