Skip to content
This repository was archived by the owner on Feb 26, 2024. It is now read-only.

Commit 7f3d467

Browse files
committed
add comment, patch Observable.create
1 parent 26c83cb commit 7f3d467

File tree

2 files changed

+103
-15
lines changed

2 files changed

+103
-15
lines changed

lib/rxjs/rxjs.ts

+58-15
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ declare let define: any;
2828

2929
const Observable = Rx.Observable;
3030

31+
// monkey-patch Observable to save the
32+
// current zone as ConstructorZone
3133
Rx.Observable = function() {
3234
Observable.apply(this, arguments);
3335
this._zone = Zone.current;
@@ -38,38 +40,56 @@ declare let define: any;
3840

3941
const subscribe = Observable.prototype.subscribe;
4042
const lift = Observable.prototype.lift;
43+
const create = Observable.create;
4144

45+
// patch Observable.prototype.subscribe
46+
// if SubscripitionZone is different with ConstructorZone
47+
// we should run _subscribe in ConstructorZone and
48+
// create sinke in SubscriptionZone,
49+
// and tearDown should also run into ConstructorZone
4250
Observable.prototype.subscribe = function() {
4351
const _zone = this._zone;
4452
const currentZone = Zone.current;
4553

54+
// patch inner function this._subscribe to check
55+
// SubscriptionZone is same with ConstuctorZone or not
4656
if (this._subscribe && typeof this._subscribe === 'function') {
4757
this._subscribe._zone = this._zone;
4858
const _subscribe = this._subscribe;
4959
if (_zone) {
5060
this._subscribe = function() {
5161
const args = Array.prototype.slice.call(arguments);
5262
const subscriber = args.length > 0 ? args[0] : undefined;
63+
// also keep currentZone in Subscriber
64+
// for later Subscriber.next/error/complete method
5365
if (subscriber && !subscriber._zone) {
5466
subscriber._zone = currentZone;
5567
}
68+
// _subscribe should run in ConstructorZone
69+
// but for performance concern, we should check
70+
// whether ConsturctorZone === Zone.current here
5671
const tearDownLogic = _zone !== Zone.current ? _zone.run(_subscribe, this, args) :
5772
_subscribe.apply(this, args);
5873
if (tearDownLogic && typeof tearDownLogic === 'function') {
59-
const patchedTeadDownLogic = function() {
74+
const patchedTearDownLogic = function() {
75+
// tearDownLogic should also run in ConstructorZone
76+
// but for performance concern, we should check
77+
// whether ConsturctorZone === Zone.current here
6078
if (_zone && _zone !== Zone.current) {
6179
return _zone.run(tearDownLogic, this, arguments);
6280
} else {
6381
return tearDownLogic.apply(this, arguments);
6482
}
6583
};
66-
return patchedTeadDownLogic;
84+
return patchedTearDownLogic;
6785
}
6886
return tearDownLogic;
6987
};
7088
}
7189
}
7290

91+
// if operator is involved, we should also
92+
// patch the call method to save the Subscription zone
7393
if (this.operator && _zone && _zone !== currentZone) {
7494
const call = this.operator.call;
7595
this.operator.call = function() {
@@ -82,65 +102,88 @@ declare let define: any;
82102
};
83103
}
84104
const result = subscribe.apply(this, arguments);
105+
// clean up _subscribe._zone to prevent
106+
// the same _subscribe being used in multiple
107+
// Observable instances.
85108
if (this._subscribe) {
86109
this._subscribe._zone = undefined;
87110
}
88-
result._zone = Zone.current;
111+
// the result is the subscriber sink,
112+
// we save the current Zone here
113+
result._zone = currentZone;
89114
return result;
90115
};
91116

117+
// patch lift method to save ConstructorZone of Observable
92118
Observable.prototype.lift = function() {
93119
const observable = lift.apply(this, arguments);
94120
observable._zone = Zone.current;
95121
return observable;
96122
};
97123

124+
// patch create method to save ConstructorZone of Observable
125+
Rx.Observable.create = function() {
126+
const observable = create.apply(this, arguments);
127+
observable._zone = Zone.current;
128+
return observable;
129+
};
130+
98131
const Subscriber = Rx.Subscriber;
99132

100133
const next = Subscriber.prototype.next;
101134
const error = Subscriber.prototype.error;
102135
const complete = Subscriber.prototype.complete;
103136
const unsubscribe = Subscriber.prototype.unsubscribe;
104137

138+
// patch Subscriber.next to make sure it run
139+
// into SubscriptionZone
105140
Subscriber.prototype.next = function() {
106141
const currentZone = Zone.current;
107-
const observableZone = this._zone;
142+
const subscriptionZone = this._zone;
108143

109-
if (observableZone && observableZone !== currentZone) {
110-
return observableZone.run(next, this, arguments, nextSource);
144+
// for performance concern, check Zone.current
145+
// equal with this._zone(SubscriptionZone) or not
146+
if (subscriptionZone && subscriptionZone !== currentZone) {
147+
return subscriptionZone.run(next, this, arguments, nextSource);
111148
} else {
112149
return next.apply(this, arguments);
113150
}
114151
};
115152

116153
Subscriber.prototype.error = function() {
117154
const currentZone = Zone.current;
118-
const observableZone = this._zone;
155+
const subscriptionZone = this._zone;
119156

120-
if (observableZone && observableZone !== currentZone) {
121-
return observableZone.run(error, this, arguments, errorSource);
157+
// for performance concern, check Zone.current
158+
// equal with this._zone(SubscriptionZone) or not
159+
if (subscriptionZone && subscriptionZone !== currentZone) {
160+
return subscriptionZone.run(error, this, arguments, nextSource);
122161
} else {
123162
return error.apply(this, arguments);
124163
}
125164
};
126165

127166
Subscriber.prototype.complete = function() {
128167
const currentZone = Zone.current;
129-
const observableZone = this._zone;
168+
const subscriptionZone = this._zone;
130169

131-
if (observableZone && observableZone !== currentZone) {
132-
return observableZone.run(complete, this, arguments, completeSource);
170+
// for performance concern, check Zone.current
171+
// equal with this._zone(SubscriptionZone) or not
172+
if (subscriptionZone && subscriptionZone !== currentZone) {
173+
return subscriptionZone.run(complete, this, arguments, nextSource);
133174
} else {
134175
return complete.apply(this, arguments);
135176
}
136177
};
137178

138179
Subscriber.prototype.unsubscribe = function() {
139180
const currentZone = Zone.current;
140-
const observableZone = this._zone;
181+
const subscriptionZone = this._zone;
141182

142-
if (observableZone && observableZone !== currentZone) {
143-
return observableZone.run(unsubscribe, this, arguments, unsubscribeSource);
183+
// for performance concern, check Zone.current
184+
// equal with this._zone(SubscriptionZone) or not
185+
if (subscriptionZone && subscriptionZone !== currentZone) {
186+
return subscriptionZone.run(unsubscribe, this, arguments, nextSource);
144187
} else {
145188
return unsubscribe.apply(this, arguments);
146189
}

test/rxjs/rxjs.spec.ts

+45
Original file line numberDiff line numberDiff line change
@@ -150,4 +150,49 @@ describe('Zone interaction', () => {
150150
expect(log).toEqual(['map: MyValue', 'next', 'complete', 'cleanup']);
151151
});
152152

153+
it('should run operators in the zone of declaration with Observable.create', () => {
154+
const log: string[] = [];
155+
const rootZone: Zone = Zone.current;
156+
const constructorZone: Zone = Zone.current.fork({name: 'Constructor Zone'});
157+
const operatorZone: Zone = Zone.current.fork({name: 'Operator Zone'});
158+
const subscriptionZone: Zone = Zone.current.fork({name: 'Subscription Zone'});
159+
let observable: any = constructorZone.run(() => Observable.create((subscriber: any) => {
160+
// Execute the `next`/`complete` in different zone, and assert that
161+
// correct zone
162+
// is restored.
163+
rootZone.run(() => {
164+
subscriber.next('MyValue');
165+
subscriber.complete();
166+
});
167+
return () => {
168+
expect(Zone.current.name).toEqual(constructorZone.name);
169+
log.push('cleanup');
170+
};
171+
}));
172+
173+
observable = operatorZone.run(() => observable.map((value: any) => {
174+
expect(Zone.current.name).toEqual(operatorZone.name);
175+
log.push('map: ' + value);
176+
return value;
177+
}));
178+
179+
subscriptionZone.run(
180+
() => observable.subscribe(
181+
() => {
182+
expect(Zone.current.name).toEqual(subscriptionZone.name);
183+
log.push('next');
184+
},
185+
(e: any) => {
186+
expect(Zone.current.name).toEqual(subscriptionZone.name);
187+
log.push('error: ' + e);
188+
},
189+
() => {
190+
expect(Zone.current.name).toEqual(subscriptionZone.name);
191+
log.push('complete');
192+
}));
193+
194+
expect(log).toEqual(['map: MyValue', 'next', 'complete', 'cleanup']);
195+
});
196+
197+
153198
});

0 commit comments

Comments
 (0)