-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.js
61 lines (46 loc) · 1.38 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
var Readable = require('readable-stream')
var eos = require('end-of-stream')
var util = require('util')
util.inherits(ContinueStream, Readable)
function ContinueStream(next, options) {
if (!(this instanceof ContinueStream))
return new ContinueStream(next, options)
Readable.call(this, options)
this.next = next
this.nextStream()
}
ContinueStream.prototype.nextStream = function() {
this.next(function(err, source) {
if (err) return this.emit('error', err)
if (!source) return this.push(null)
if (!source._readableState) source = new Readable().wrap(source)
this._source = source
eos(source, function(err) {
if (err) return this.emit('error', err)
this.nextStream()
}.bind(this))
source.on('readable', function() {
this._forward()
}.bind(this))
}.bind(this), this._source)
}
ContinueStream.prototype._read = function() {
this._drained = true
this._forward()
}
ContinueStream.prototype._forward = function() {
if (this._forwarding || !this._source || !this._drained) return
this._forwarding = true
var data
while ((data = this._source.read()) !== null) {
this._drained = this.push(data)
}
this._forwarding = false
}
module.exports = ContinueStream
module.exports.obj = function(next, options) {
options = options || {}
options.objectMode = true
options.highWaterMark = 16
return ContinueStream(next, options)
}