diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 5eec3e5..51d151c 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -16,6 +16,6 @@ jobs: - name: Install and test uses: actions/setup-node@v1 with: - node-version: '12.x' + node-version: '14.x' - run: npm install - run: npm test diff --git a/README.md b/README.md index d4f9863..8f63a72 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,7 @@ Other create-modes may be supported in the future, such as writing to a Buffer ( * [`async CarDatastore.readStreamComplete(stream)`](#CarDatastore__readStreamComplete) * [`async CarDatastore.readStreaming(stream)`](#CarDatastore__readStreaming) * [`async CarDatastore.writeStream(stream)`](#CarDatastore__writeStream) + * [`async CarDatastore.completeGraph(root, get, car[, concurrency])`](#CarDatastore__completeGraph) * [`class CarDatastore`](#CarDatastore) * [`async CarDatastore#get(key)`](#CarDatastore_get) * [`async CarDatastore#has(key)`](#CarDatastore_has) @@ -215,6 +216,24 @@ This create-mode is not available in a browser environment. **Return value** _(`CarDatastore`)_: an append-only, streaming CarDatastore. + +### `async CarDatastore.completeGraph(root, get, car[, concurrency])` + +Read a complete IPLD graph from a provided datastore and store the blocks in +a CAR file. + +**Parameters:** + +* **`root`** _(`Block`)_: the root of the graph to start at, this block will be + included in the CAR and its CID will be set as the single root. +* **`get`** _(`AsyncFunction`)_: an `async` function that takes a CID and returns + a `Block`. Can be used to attach to an arbitrary data store. +* **`car`** _(`CarDatastore`)_: a writable `CarDatastore` that has not yet been + written to (`setRoots()` will be called on it which requires that no data + has been written). +* **`concurrency`** _(`number`, optional, default=`1`)_: how many asynchronous `get` operations to + perform at once. + ### `class CarDatastore` diff --git a/car.js b/car.js index 43af4a8..8cbbb83 100644 --- a/car.js +++ b/car.js @@ -135,6 +135,55 @@ async function writeStream (stream) { return new CarDatastore(reader, writer) } +async function traverseBlock (block, get, car, concurrency = 1, seen = new Set()) { + const cid = await block.cid() + await car.put(cid, block.encodeUnsafe()) + seen.add(cid.toString('base58btc')) + if (cid.codec === 'raw') { + return + } + const reader = block.reader() + const missing = (link) => !seen.has(link.toString('base58btc')) + const links = Array.from(reader.links()).filter(missing).map(([, link]) => link) + + while (links.length) { + const chunk = links.splice(0, concurrency) + const blocks = chunk.map(get) + while (chunk.length) { + const link = chunk.shift() + const block = blocks.shift() + if (missing(link)) { + await traverseBlock(await block, get, car, concurrency, seen) + } + } + } +} + +/** + * @name CarDatastore.completeGraph + * @description + * Read a complete IPLD graph from a provided datastore and store the blocks in + * a CAR file. + * @function + * @memberof CarDatastore + * @static + * @async + * @param {Block} root the root of the graph to start at, this block will be + * included in the CAR and its CID will be set as the single root. + * @param {AsyncFunction} get an `async` function that takes a CID and returns + * a `Block`. Can be used to attach to an arbitrary data store. + * @param {CarDatastore} car a writable `CarDatastore` that has not yet been + * written to (`setRoots()` will be called on it which requires that no data + * has been written). + * @param {number} [concurrency=1] how many asynchronous `get` operations to + * perform at once. + */ +async function completeGraph (root, get, car, concurrency) { + await car.setRoots([root]) + await traverseBlock(await get(root), get, car, concurrency) + await car.close() +} + module.exports.readBuffer = readBuffer module.exports.readFileComplete = readFileComplete module.exports.readStreamComplete = readStreamComplete @@ -142,3 +191,4 @@ module.exports.readStreaming = readStreaming module.exports.writeStream = writeStream module.exports.indexer = indexer module.exports.readRaw = readRaw +module.exports.completeGraph = completeGraph diff --git a/datastore.js b/datastore.js index 92ebc29..39cac55 100644 --- a/datastore.js +++ b/datastore.js @@ -150,6 +150,7 @@ class CarDatastore { } async batch () { + /* c8 ignore next */ throw new Error('Unimplemented operation') } diff --git a/lib/coding.js b/lib/coding.js index b4f0ba9..873fc96 100644 --- a/lib/coding.js +++ b/lib/coding.js @@ -215,7 +215,8 @@ function encodeStream (roots, blocks) { encode(writer, roots, blocks) .then(() => { stream.end() - }).catch(/* istanbul ignore next toohard */ (err) => { + }).catch((err) => { + /* c8 ignore next 3 */ // maybe this could end up being recursive, with the promise rejection // above, depending on conditions? stream.emit('error', err) @@ -227,7 +228,7 @@ function createStreamWriter (stream) { return (buf) => { return new Promise((resolve, reject) => { stream.write(buf, (err) => { - // istanbul ignore next toohard + /* c8 ignore next 3 */ if (err) { reject(err) } diff --git a/lib/reader-writer-iface.js b/lib/reader-writer-iface.js index bf6f2a3..16dc326 100644 --- a/lib/reader-writer-iface.js +++ b/lib/reader-writer-iface.js @@ -2,17 +2,17 @@ const { Errors } = require('interface-datastore') const { verifyRoots, cidToKey } = require('./util') class Reader { - // istanbul ignore next + /* c8 ignore next 3 */ get () { throw new Error('Unimplemented method') } - // istanbul ignore next + /* c8 ignore next 3 */ has () { throw new Error('Unimplemented method') } - // istanbul ignore next + /* c8 ignore next 3 */ keys () { throw new Error('Unimplemented method') } diff --git a/lib/writer-stream.js b/lib/writer-stream.js index 871a9ef..6456a2f 100644 --- a/lib/writer-stream.js +++ b/lib/writer-stream.js @@ -13,7 +13,7 @@ class StreamWriter extends Writer { } /* async */ setRoots (roots) { - // istanbul ignore next toohard + /* c8 ignore next 3 */ if (this._mutex) { throw new Error('Roots already set or blocks are being written') } diff --git a/package.json b/package.json index 302e8ce..01b67d1 100644 --- a/package.json +++ b/package.json @@ -12,7 +12,8 @@ "test:browser": "npx polendina --cleanup test/test-readbuffer.js test/test-query.js", "test:node": "hundreds mocha test/test-*.js", "test": "npm run lint && npm run test:node && npm run test:browser", - "docs": "npx jsdoc4readme --readme *.js lib/raw.js" + "docs": "npx jsdoc4readme --readme *.js lib/raw.js", + "coverage": "c8 --reporter=html --reporter=text mocha test/test-*.js && npx st -d coverage -p 8888" }, "repository": { "type": "git", @@ -32,13 +33,13 @@ "devDependencies": { "bl": "^4.0.2", "garbage": "0.0.0", - "hundreds": "0.0.2", - "mocha": "^7.1.1" + "hundreds": "0.0.7", + "mocha": "^7.2.0" }, "dependencies": { "@ipld/block": "^4.0.0", "cids": "^0.8.0", - "interface-datastore": "^0.8.3", + "interface-datastore": "^1.0.4", "multicodec": "^1.0.1", "varint": "^5.0.0" } diff --git a/test/test-complete-graph.js b/test/test-complete-graph.js new file mode 100644 index 0000000..30e4cc9 --- /dev/null +++ b/test/test-complete-graph.js @@ -0,0 +1,79 @@ +/* eslint-env mocha */ + +const assert = require('assert') +const { writeStream, readBuffer, completeGraph } = require('../') +const Block = require('@ipld/block') +const { PassThrough } = require('stream') + +const same = assert.deepStrictEqual + +function all (car) { + const _traverse = async function * (link, seen = new Set()) { + link = await link + seen.add(link.toString('base64')) + const encoded = await car.get(link) + const block = Block.create(encoded, link) + yield block + const cid = await block.cid() + if (cid.codec === 'raw') { + return + } + + for (const [, link] of block.reader().links()) { + if (seen.has(link.toString('base64'))) { + continue + } + yield * _traverse(link, seen) + } + } + + return _traverse(car.getRoots().then(([root]) => root)) +} + +async function createGet (blocks) { + const db = new Map() + for (const block of blocks) { + db.set((await block.cid()).toString('base64'), block) + } + return (cid) => new Promise((resolve) => resolve(db.get(cid.toString('base64')))) +} + +async function concat (stream) { + const buffers = [] + for await (const buffer of stream) { + buffers.push(buffer) + } + return Buffer.concat(buffers) +} + +describe('Create car for full graph', () => { + it('small graph', async () => { + const leaf1 = Block.encoder({ hello: 'world' }, 'dag-cbor') + const leaf2 = Block.encoder({ test: 1 }, 'dag-cbor') + const raw = Block.encoder(Buffer.from('test'), 'raw') + const root = Block.encoder( + { + one: await leaf1.cid(), + two: await leaf2.cid(), + three: await leaf1.cid(), + zraw: await raw.cid() + }, + 'dag-cbor') + const expected = [root, leaf1, leaf2, raw] + const get = await createGet(expected) + const stream = new PassThrough() + const car = await writeStream(stream) + await completeGraph(await root.cid(), get, car) + const data = await concat(stream) + + const reader = await readBuffer(data) + const [readRoot, ...more] = await reader.getRoots() + same(more.length, 0) + assert.ok(readRoot.equals(await root.cid())) + + for await (const block of all(reader)) { + const expectedBlock = expected.shift() + assert.ok((await expectedBlock.cid()).equals(await block.cid())) + } + }) +}) diff --git a/test/test-raw.js b/test/test-raw.js index 24229a0..8f4563b 100644 --- a/test/test-raw.js +++ b/test/test-raw.js @@ -78,7 +78,7 @@ describe('Raw', () => { it('read raw using index (FileHandle)', async () => { const fd = await fs.promises.open(path.join(__dirname, 'go.car')) - verifyRead(fd) + await verifyRead(fd) await fd.close(fd) })