-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathindex.js
72 lines (60 loc) · 1.57 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
const getIterator = require('get-iterator')
const toIterable = require('pull-stream-to-async-iterator')
function toPull (source) {
source = getIterator(source)
return async (end, cb) => {
if (end) {
if (source.return) {
try {
await source.return()
} catch (err) {
return cb(err)
}
}
return cb(end)
}
let next
try {
next = await source.next()
} catch (err) {
return cb(err)
}
if (next.done) return cb(true) // eslint-disable-line
cb(null, next.value)
}
}
toPull.source = toPull
toPull.transform = toPull.through = source => read => toPull(source(toIterable(read)))
toPull.duplex = duplex => ({
sink: toPull.sink(duplex.sink),
source: toPull(duplex.source)
})
toPull.sink = sink => {
return read => {
sink({
[Symbol.asyncIterator] () {
return this
},
next: () => new Promise((resolve, reject) => {
read(null, (end, value) => {
if (end === true) return resolve({ done: true, value })
if (end) return reject(end)
resolve({ done: false, value })
})
}),
return: () => new Promise((resolve, reject) => {
read(true, (end, value) => {
if (end && end !== true) return reject(end)
resolve({ done: true, value })
})
}),
throw: err => new Promise((resolve, reject) => {
read(err, (end, value) => {
if (end && end !== true) return reject(end)
resolve({ done: true, value })
})
})
})
}
}
module.exports = toPull