Skip to content
This repository was archived by the owner on Aug 12, 2020. It is now read-only.

Commit 833accf

Browse files
achingbraindaviddias
authored andcommitted
feat: Add reader to read files or part of files as streams
1 parent dd5c7ff commit 833accf

File tree

7 files changed

+480
-2
lines changed

7 files changed

+480
-2
lines changed

README.md

+94-2
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ const Exporter = require('ipfs-unixfs-engine').Exporter
167167

168168
### new Exporter(<cid or ipfsPath>, <dag or ipld-resolver>)
169169

170-
Uses the given [dag API or an ipld-resolver instance][] to fetch an IPFS [UnixFS][] object(s) by their multiaddress.
170+
Uses the given [dag API] or an [ipld-resolver instance][] to fetch an IPFS [UnixFS][] object(s) by their multiaddress.
171171

172172
Creates a new readable stream in object mode that outputs objects of the form
173173

@@ -181,9 +181,101 @@ Creates a new readable stream in object mode that outputs objects of the form
181181
Errors are received as with a normal stream, by listening on the `'error'` event to be emitted.
182182

183183

184-
[IPLD Resolver]: https://github.com/ipld/js-ipld-resolver
184+
[dag API]: https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/DAG.md
185+
[ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver
185186
[UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs
186187

188+
## Reader
189+
190+
The `reader` allows you to receive part or all of a file as a [pull-stream].
191+
192+
#### Reader example
193+
194+
```js
195+
const readable = require('ipfs-unixfs-engine').readable
196+
const pull = require('pull-stream')
197+
const drain = require('pull-stream/sinks/collect')
198+
199+
pull(
200+
readable(cid, ipldResolver)
201+
collect((error, chunks) => {
202+
// do something with the file chunks and/or handle errors
203+
})
204+
)
205+
```
206+
207+
#### Reader API
208+
209+
```js
210+
const reader = require('ipfs-unixfs-engine').reader
211+
```
212+
213+
### reader(<cid or ipfsPath>, <dag or ipld-resolver>, <begin>, <end>)
214+
215+
Uses the given [dag API][] or an [ipld-resolver instance][] to fetch an IPFS [UnixFS][] object by their multiaddress.
216+
217+
Creates a new [pull-stream][] that sends the requested chunks of data as a series of [Buffer][] objects.
218+
219+
```js
220+
const readable = require('ipfs-unixfs-engine').readable
221+
const pull = require('pull-stream')
222+
const drain = require('pull-stream/sinks/drain')
223+
224+
pull(
225+
readable(cid, ipldResolver),
226+
drain((chunk) => {
227+
// do something with the file chunk
228+
})
229+
)
230+
```
231+
232+
#### `begin` and `end`
233+
234+
`begin` and `end` arguments can optionally be passed to the reader function. These follow the same semantics as the JavaScript [`Array.slice(begin, end)`][] method.
235+
236+
That is: `begin` is the index in the stream to start sending data, `end` is the index *before* which to stop sending data.
237+
238+
A negative `begin` starts the slice from the end of the stream and a negative `end` ends the slice by subtracting `end` from the total stream length.
239+
240+
See [the tests](test/reader.js) for examples of using these arguments.
241+
242+
```js
243+
const readable = require('ipfs-unixfs-engine').readable
244+
const pull = require('pull-stream')
245+
const drain = require('pull-stream/sinks/drain')
246+
247+
pull(
248+
readable(cid, ipldResolver, 0, 10)
249+
drain((chunk) => {
250+
// chunk is a Buffer containing only the first 10 bytes of the stream
251+
})
252+
)
253+
```
254+
255+
#### Errors
256+
257+
Errors are received by [pull-stream][] sinks.
258+
259+
```js
260+
const readable = require('ipfs-unixfs-engine').readable
261+
const pull = require('pull-stream')
262+
const drain = require('pull-stream/sinks/collect')
263+
264+
pull(
265+
readable(cid, ipldResolver, 0, 10)
266+
collect((error, chunks) => {
267+
// handle the error
268+
})
269+
)
270+
```
271+
272+
[pull-stream]: https://www.npmjs.com/package/pull-stream
273+
[Buffer]: https://www.npmjs.com/package/buffer
274+
[dag API]: https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/DAG.md
275+
[ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver
276+
[UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs
277+
[`Array.slice(begin, end)`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/slice
278+
187279
## Contribute
188280

189281
Feel free to join in. All welcome. Open an [issue](https://github.com/ipfs/js-ipfs-unixfs-engine/issues)!

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
"lodash": "^4.17.5",
6767
"multihashes": "~0.4.13",
6868
"multihashing-async": "~0.4.8",
69+
"pull-async-values": "^1.0.3",
6970
"pull-batch": "^1.0.0",
7071
"pull-block": "^1.4.0",
7172
"pull-cat": "^1.1.11",

src/index.js

+1
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@
22

33
exports.importer = exports.Importer = require('./importer')
44
exports.exporter = exports.Exporter = require('./exporter')
5+
exports.reader = exports.Reader = require('./reader')

src/reader/index.js

+137
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
'use strict'
2+
3+
const CID = require('cids')
4+
const pull = require('pull-stream')
5+
const asyncValues = require('pull-async-values')
6+
const asyncMap = require('pull-stream/throughs/async-map')
7+
const map = require('pull-stream/throughs/map')
8+
const UnixFS = require('ipfs-unixfs')
9+
const toB58String = require('multihashes').toB58String
10+
const waterfall = require('async/waterfall')
11+
12+
module.exports = (path, ipldResolver, begin = 0, end) => {
13+
let streamPosition = 0
14+
15+
return pull(
16+
asyncValues((cb) => {
17+
waterfall([
18+
(next) => toCid(path, next),
19+
(cid, next) => ipldResolver.get(cid, next),
20+
(node, next) => {
21+
const meta = UnixFS.unmarshal(node.value.data)
22+
23+
if (meta.type !== 'file') {
24+
return next(new Error(`Path ${path} was not a file (was ${meta.type}), can only read files`))
25+
}
26+
27+
const fileSize = meta.fileSize()
28+
29+
if (!end || end > fileSize) {
30+
end = fileSize
31+
}
32+
33+
if (begin < 0) {
34+
begin = fileSize + begin
35+
}
36+
37+
if (end < 0) {
38+
end = fileSize + end
39+
}
40+
41+
const links = node.value.links
42+
43+
if (!links || !links.length) {
44+
if (meta.data && meta.data.length) {
45+
// file was small enough to fit in one DAGNode so has no links
46+
return next(null, [(done) => done(null, meta.data)])
47+
}
48+
49+
return next(new Error(`Path ${path} had no links or data`))
50+
}
51+
52+
const linkedDataSize = links.reduce((acc, curr) => acc + curr.size, 0)
53+
const overhead = (linkedDataSize - meta.fileSize()) / links.length
54+
55+
// create an array of functions to fetch link data
56+
next(null, links.map((link) => (done) => {
57+
// DAGNode Links report unixfs object data sizes $overhead bytes (typically 14)
58+
// larger than they actually are due to the protobuf wrapper
59+
const bytesInLinkedObjectData = link.size - overhead
60+
61+
if (begin > (streamPosition + bytesInLinkedObjectData)) {
62+
// Start byte is after this block so skip it
63+
streamPosition += bytesInLinkedObjectData
64+
65+
return done()
66+
}
67+
68+
if (end < streamPosition) {
69+
// End byte was before this block so skip it
70+
streamPosition += bytesInLinkedObjectData
71+
72+
return done()
73+
}
74+
75+
// transform the multihash to a cid, the cid to a node and the node to some data
76+
waterfall([
77+
(next) => toCid(link.multihash, next),
78+
(cid, next) => ipldResolver.get(cid, next),
79+
(node, next) => next(null, node.value.data),
80+
(data, next) => next(null, UnixFS.unmarshal(data).data)
81+
], done)
82+
}))
83+
}
84+
], cb)
85+
}),
86+
asyncMap((loadLinkData, cb) => loadLinkData(cb)),
87+
pull.filter(Boolean),
88+
map((data) => {
89+
const block = extractDataFromBlock(data, streamPosition, begin, end)
90+
91+
streamPosition += data.length
92+
93+
return block
94+
})
95+
)
96+
}
97+
98+
function toCid (input, callback) {
99+
let path = input
100+
let cid
101+
102+
try {
103+
if (Buffer.isBuffer(path)) {
104+
path = toB58String(path)
105+
}
106+
107+
if (path.indexOf('/ipfs/') === 0) {
108+
path = path.substring('/ipfs/'.length)
109+
}
110+
111+
if (path.charAt(path.length - 1) === '/') {
112+
path = path.substring(0, path.length - 1)
113+
}
114+
115+
cid = new CID(path)
116+
} catch (error) {
117+
return callback(new Error(`Path '${input}' was invalid: ${error.message}`))
118+
}
119+
120+
callback(null, cid)
121+
}
122+
123+
function extractDataFromBlock (block, streamPosition, begin, end) {
124+
const blockLength = block.length
125+
126+
if (end - streamPosition < blockLength) {
127+
// If the end byte is in the current block, truncate the block to the end byte
128+
block = block.slice(0, end - streamPosition)
129+
}
130+
131+
if (begin > streamPosition && begin < (streamPosition + blockLength)) {
132+
// If the start byte is in the current block, skip to the start byte
133+
block = block.slice(begin - streamPosition)
134+
}
135+
136+
return block
137+
}

test/browser.js

+3
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ describe('IPFS data importing tests on the Browser', function () {
6060
// require('./exporter')(repo)
6161
// require('./exporter-subtree')(repo)
6262

63+
// Reader
64+
require('./reader')(repo)
65+
6366
// Other
6467
require('./import-export')(repo)
6568
require('./import-export-nested-dir')(repo)

test/node.js

+3
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ describe('IPFS UnixFS Engine', () => {
6161
require('./exporter')(repo)
6262
require('./exporter-subtree')(repo)
6363

64+
// Reader
65+
require('./reader')(repo)
66+
6467
// Other
6568
require('./import-export')(repo)
6669
require('./import-export-nested-dir')(repo)

0 commit comments

Comments
 (0)