Skip to content

Commit 3d22c7a

Browse files
committed
fix(bufferToggle): accepts promise as openings
1 parent 02239fb commit 3d22c7a

File tree

2 files changed

+68
-49
lines changed

2 files changed

+68
-49
lines changed

spec/operators/bufferToggle-spec.ts

+44-2
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,48 @@ describe('Observable.prototype.bufferToggle', () => {
343343
expectSubscriptions(e2.subscriptions).toBe(e2subs);
344344
});
345345

346+
it('should accept openings resolved promise', (done: MochaDone) => {
347+
const e1 = Observable.concat(
348+
Observable.timer(10).mapTo(1),
349+
Observable.timer(100).mapTo(2),
350+
Observable.timer(150).mapTo(3),
351+
Observable.timer(200).mapTo(4));
352+
353+
const expected = [[1]];
354+
355+
e1.bufferToggle(new Promise((resolve: any) => { resolve(42); }), () => {
356+
return Observable.timer(50);
357+
}).subscribe((x) => {
358+
expect(x).to.deep.equal(expected.shift());
359+
}, (x) => {
360+
done(new Error('should not be called'));
361+
}, () => {
362+
expect(expected.length).to.be.equal(0);
363+
done();
364+
});
365+
});
366+
367+
it('should accept openings rejected promise', (done: MochaDone) => {
368+
const e1 = Observable.concat(Observable.of(1),
369+
Observable.timer(10).mapTo(2),
370+
Observable.timer(10).mapTo(3),
371+
Observable.timer(100).mapTo(4)
372+
);
373+
374+
const expected = 42;
375+
376+
e1.bufferToggle(new Promise((resolve: any, reject: any) => { reject(expected); }), () => {
377+
return Observable.timer(50);
378+
}).subscribe((x) => {
379+
done(new Error('should not be called'));
380+
}, (x) => {
381+
expect(x).to.equal(expected);
382+
done();
383+
}, () => {
384+
done(new Error('should not be called'));
385+
});
386+
});
387+
346388
it('should accept closing selector that returns a resolved promise', (done: MochaDone) => {
347389
const e1 = Observable.concat(Observable.of(1),
348390
Observable.timer(10).mapTo(2),
@@ -357,8 +399,8 @@ describe('Observable.prototype.bufferToggle', () => {
357399
}, () => {
358400
done(new Error('should not be called'));
359401
}, () => {
360-
expect(expected.length).to.be.equal(0);
361-
done();
402+
expect(expected.length).to.be.equal(0);
403+
done();
362404
});
363405
});
364406

src/operator/bufferToggle.ts

+24-47
Original file line numberDiff line numberDiff line change
@@ -35,29 +35,29 @@ import {InnerSubscriber} from '../InnerSubscriber';
3535
* @see {@link bufferWhen}
3636
* @see {@link windowToggle}
3737
*
38-
* @param {Observable<O>} openings An observable of notifications to start new
38+
* @param {SubscribableOrPromise<O>} openings A Subscribable or Promise of notifications to start new
3939
* buffers.
40-
* @param {function(value: O): Observable} closingSelector A function that takes
40+
* @param {function(value: O): SubscribableOrPromise} closingSelector A function that takes
4141
* the value emitted by the `openings` observable and returns a Subscribable or Promise,
4242
* which, when it emits, signals that the associated buffer should be emitted
4343
* and cleared.
4444
* @return {Observable<T[]>} An observable of arrays of buffered values.
4545
* @method bufferToggle
4646
* @owner Observable
4747
*/
48-
export function bufferToggle<T, O>(openings: Observable<O>,
49-
closingSelector: (value: O) => SubscribableOrPromise<any> | void): Observable<T[]> {
48+
export function bufferToggle<T, O>(openings: SubscribableOrPromise<O>,
49+
closingSelector: (value: O) => SubscribableOrPromise<any>): Observable<T[]> {
5050
return this.lift(new BufferToggleOperator<T, O>(openings, closingSelector));
5151
}
5252

5353
export interface BufferToggleSignature<T> {
54-
<O>(openings: Observable<O>, closingSelector: (value: O) => SubscribableOrPromise<any> | void): Observable<T[]>;
54+
<O>(openings: SubscribableOrPromise<O>, closingSelector: (value: O) => SubscribableOrPromise<any>): Observable<T[]>;
5555
}
5656

5757
class BufferToggleOperator<T, O> implements Operator<T, T[]> {
5858

59-
constructor(private openings: Observable<O>,
60-
private closingSelector: (value: O) => SubscribableOrPromise<any> | void) {
59+
constructor(private openings: SubscribableOrPromise<O>,
60+
private closingSelector: (value: O) => SubscribableOrPromise<any>) {
6161
}
6262

6363
call(subscriber: Subscriber<T[]>, source: any): any {
@@ -79,10 +79,10 @@ class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {
7979
private contexts: Array<BufferContext<T>> = [];
8080

8181
constructor(destination: Subscriber<T[]>,
82-
private openings: Observable<O>,
82+
private openings: SubscribableOrPromise<O>,
8383
private closingSelector: (value: O) => SubscribableOrPromise<any> | void) {
8484
super(destination);
85-
this.add(this.openings.subscribe(new BufferToggleOpeningsSubscriber(this)));
85+
this.add(subscribeToResult(this, openings));
8686
}
8787

8888
protected _next(value: T): void {
@@ -118,7 +118,17 @@ class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {
118118
super._complete();
119119
}
120120

121-
openBuffer(value: O): void {
121+
notifyNext(outerValue: any, innerValue: O,
122+
outerIndex: number, innerIndex: number,
123+
innerSub: InnerSubscriber<T, O>): void {
124+
outerValue ? this.closeBuffer(outerValue) : this.openBuffer(innerValue);
125+
}
126+
127+
notifyComplete(innerSub: InnerSubscriber<T, O>): void {
128+
this.closeBuffer((<any> innerSub).context);
129+
}
130+
131+
private openBuffer(value: O): void {
122132
try {
123133
const closingSelector = this.closingSelector;
124134
const closingNotifier = closingSelector.call(this, value);
@@ -130,16 +140,6 @@ class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {
130140
}
131141
}
132142

133-
notifyNext(outerValue: any, innerValue: O,
134-
outerIndex: number, innerIndex: number,
135-
innerSub: InnerSubscriber<T, O>): void {
136-
this.closeBuffer(outerValue);
137-
}
138-
139-
notifyComplete(innerSub: InnerSubscriber<T, O>): void {
140-
this.closeBuffer((<any> innerSub).context);
141-
}
142-
143143
private closeBuffer(context: BufferContext<T>): void {
144144
const contexts = this.contexts;
145145

@@ -162,36 +162,13 @@ class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {
162162

163163
const innerSubscription = subscribeToResult(this, closingNotifier, <any>context);
164164

165-
if (!innerSubscription.isUnsubscribed) {
165+
if (!innerSubscription || innerSubscription.isUnsubscribed) {
166+
this.closeBuffer(context);
167+
} else {
166168
(<any> innerSubscription).context = context;
167169

168170
this.add(innerSubscription);
169171
subscription.add(innerSubscription);
170-
} else {
171-
this.closeBuffer(context);
172172
}
173173
}
174-
}
175-
176-
/**
177-
* We need this JSDoc comment for affecting ESDoc.
178-
* @ignore
179-
* @extends {Ignored}
180-
*/
181-
class BufferToggleOpeningsSubscriber<T, O> extends Subscriber<O> {
182-
constructor(private parent: BufferToggleSubscriber<T, O>) {
183-
super(null);
184-
}
185-
186-
protected _next(value: O) {
187-
this.parent.openBuffer(value);
188-
}
189-
190-
protected _error(err: any) {
191-
this.parent.error(err);
192-
}
193-
194-
protected _complete() {
195-
// noop
196-
}
197-
}
174+
}

0 commit comments

Comments
 (0)