diff --git a/readme-src.coffee b/readme-src.coffee index 1748769b8..fcb7895bf 100644 --- a/readme-src.coffee +++ b/readme-src.coffee @@ -975,6 +975,15 @@ doc.marble() .input("Bacon.sequentially(200, [9,0,2]).filter(function(x) { return x })") .input("Bacon.sequentially(200, [0,1,0,12,8,0]).filter(function(x) { return x })") .output("function(a,b) { return a.concat(b) }") + +doc.fn "stream.bufferingConcat(otherStream)", """ +like [`concat`](#stream-concat), but buffers all events from `otherStream` until `stream` ends. +""" + +doc.marble() + .input("Bacon.sequentially(200, [9,0,2]).filter(function(x) { return x })") + .input("Bacon.sequentially(200, [0,1,0,12,8,0]).filter(function(x) { return x })") + .output("function(a,b) { return a.bufferingConcat(b) }") doc.fn "stream.merge(otherStream)", """ merges two streams into one stream that delivers events from both diff --git a/spec/specs/bufferingconcat.coffee b/spec/specs/bufferingconcat.coffee new file mode 100644 index 000000000..9ec7c3a40 --- /dev/null +++ b/spec/specs/bufferingconcat.coffee @@ -0,0 +1,51 @@ +# build-dependencies: startwith, filter, delay, interval, take, bus, mapEnd, map, merge, doaction, holdwhen, takeWhile + +describe "EventStream.bufferingConcat", -> + describe "provides values from streams in given order and ends when both are exhausted", -> + expectStreamEvents( + -> + left = series(2, [1, error(), 2, 3]) + right = series(1, [4, 5, 6]) + left.bufferingConcat(right) + [1, error(), 2, 3, 4, 5, 6], unstable) + describe "respects subscriber return value when providing events from left stream", -> + expectStreamEvents( + -> + left = take(3, repeat(3, [1, 3])) + right = take(3, repeat(2, [1])) + left.bufferingConcat(right).takeWhile(lessThan(2)) + [1]) + describe "respects subscriber return value when providing events from right stream", -> + expectStreamEvents( + -> + left = series(3, [1, 2]) + right = series(2, [2, 3, 4, 6]) + left.bufferingConcat(right).takeWhile(lessThan(4)) + [1, 2, 2, 3], unstable) + describe "works with Bacon.never()", -> + expectStreamEvents( + -> Bacon.never().bufferingConcat(Bacon.never()) + []) + describe "works with Bacon.once()", -> + expectStreamEvents( + -> once(2).bufferingConcat(once(1)) + [2, 1]) + describe "works with Bacon.once() and Bacon.never()", -> + expectStreamEvents( + -> once(1).bufferingConcat(Bacon.never()) + [1]) + describe "works with Bacon.never() and Bacon.once()", -> + expectStreamEvents( + -> Bacon.never().bufferingConcat(once(1)) + [1]) + describe "works with Bacon.once() and async source", -> + expectStreamEvents( + -> once(1).bufferingConcat(series(1, [2, 3])) + [1, 2, 3]) + describe "works with Bacon.once() and fromArray()", -> + expectStreamEvents( + -> once(1).bufferingConcat(fromArray([2, 3])) + [1, 2, 3], semiunstable) + it "toString", -> + expect(Bacon.never().bufferingConcat(Bacon.never()).toString()).to.equal("Bacon.never().bufferingConcat(Bacon.never())") + diff --git a/src/bufferingconcat.coffee b/src/bufferingconcat.coffee new file mode 100644 index 000000000..1b921c616 --- /dev/null +++ b/src/bufferingconcat.coffee @@ -0,0 +1,8 @@ +# build-dependencies: core, eventstream, holdwhen, startwith, map, mapend, merge + +Bacon.EventStream :: bufferingConcat = (right) -> + left = this + new EventStream describe(left, "bufferingConcat", right), (sink) -> + bufferedRight = right.holdWhen(left.map(true).startWith(true).mapEnd(false)) + unsubStream = left.merge(bufferedRight).subscribe sink + -> unsubStream() diff --git a/src/main.js b/src/main.js index cb542d202..22158892d 100644 --- a/src/main.js +++ b/src/main.js @@ -1,6 +1,7 @@ // build-dependencies: awaiting // build-dependencies: boolean // build-dependencies: buffer +// build-dependencies: bufferingconcat // build-dependencies: bufferingthrottle // build-dependencies: bus // build-dependencies: callback