Skip to content
This repository was archived by the owner on Dec 6, 2022. It is now read-only.

feat: create car file from a complete graph #3

Merged
merged 6 commits into from
Jun 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ jobs:
- name: Install and test
uses: actions/setup-node@v1
with:
node-version: '12.x'
node-version: '14.x'
- run: npm install
- run: npm test
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Other create-modes may be supported in the future, such as writing to a Buffer (
* [`async CarDatastore.readStreamComplete(stream)`](#CarDatastore__readStreamComplete)
* [`async CarDatastore.readStreaming(stream)`](#CarDatastore__readStreaming)
* [`async CarDatastore.writeStream(stream)`](#CarDatastore__writeStream)
* [`async CarDatastore.completeGraph(root, get, car[, concurrency])`](#CarDatastore__completeGraph)
* [`class CarDatastore`](#CarDatastore)
* [`async CarDatastore#get(key)`](#CarDatastore_get)
* [`async CarDatastore#has(key)`](#CarDatastore_has)
Expand Down Expand Up @@ -215,6 +216,24 @@ This create-mode is not available in a browser environment.

**Return value** _(`CarDatastore`)_: an append-only, streaming CarDatastore.

<a name="CarDatastore__completeGraph"></a>
### `async CarDatastore.completeGraph(root, get, car[, concurrency])`

Read a complete IPLD graph from a provided datastore and store the blocks in
a CAR file.

**Parameters:**

* **`root`** _(`Block`)_: the root of the graph to start at, this block will be
included in the CAR and its CID will be set as the single root.
* **`get`** _(`AsyncFunction`)_: an `async` function that takes a CID and returns
a `Block`. Can be used to attach to an arbitrary data store.
* **`car`** _(`CarDatastore`)_: a writable `CarDatastore` that has not yet been
written to (`setRoots()` will be called on it which requires that no data
has been written).
* **`concurrency`** _(`number`, optional, default=`1`)_: how many asynchronous `get` operations to
perform at once.

<a name="CarDatastore"></a>
### `class CarDatastore`

Expand Down
50 changes: 50 additions & 0 deletions car.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,60 @@ async function writeStream (stream) {
return new CarDatastore(reader, writer)
}

async function traverseBlock (block, get, car, concurrency = 1, seen = new Set()) {
const cid = await block.cid()
await car.put(cid, block.encodeUnsafe())
seen.add(cid.toString('base58btc'))
if (cid.codec === 'raw') {
return
}
const reader = block.reader()
const missing = (link) => !seen.has(link.toString('base58btc'))
const links = Array.from(reader.links()).filter(missing).map(([, link]) => link)

while (links.length) {
const chunk = links.splice(0, concurrency)
const blocks = chunk.map(get)
while (chunk.length) {
const link = chunk.shift()
const block = blocks.shift()
if (missing(link)) {
await traverseBlock(await block, get, car, concurrency, seen)
}
}
}
}

/**
* @name CarDatastore.completeGraph
* @description
* Read a complete IPLD graph from a provided datastore and store the blocks in
* a CAR file.
* @function
* @memberof CarDatastore
* @static
* @async
* @param {Block} root the root of the graph to start at, this block will be
* included in the CAR and its CID will be set as the single root.
* @param {AsyncFunction} get an `async` function that takes a CID and returns
* a `Block`. Can be used to attach to an arbitrary data store.
* @param {CarDatastore} car a writable `CarDatastore` that has not yet been
* written to (`setRoots()` will be called on it which requires that no data
* has been written).
* @param {number} [concurrency=1] how many asynchronous `get` operations to
* perform at once.
*/
async function completeGraph (root, get, car, concurrency) {
await car.setRoots([root])
await traverseBlock(await get(root), get, car, concurrency)
await car.close()
}

module.exports.readBuffer = readBuffer
module.exports.readFileComplete = readFileComplete
module.exports.readStreamComplete = readStreamComplete
module.exports.readStreaming = readStreaming
module.exports.writeStream = writeStream
module.exports.indexer = indexer
module.exports.readRaw = readRaw
module.exports.completeGraph = completeGraph
1 change: 1 addition & 0 deletions datastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class CarDatastore {
}

async batch () {
/* c8 ignore next */
throw new Error('Unimplemented operation')
}

Expand Down
5 changes: 3 additions & 2 deletions lib/coding.js
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ function encodeStream (roots, blocks) {
encode(writer, roots, blocks)
.then(() => {
stream.end()
}).catch(/* istanbul ignore next toohard */ (err) => {
}).catch((err) => {
/* c8 ignore next 3 */
// maybe this could end up being recursive, with the promise rejection
// above, depending on conditions?
stream.emit('error', err)
Expand All @@ -227,7 +228,7 @@ function createStreamWriter (stream) {
return (buf) => {
return new Promise((resolve, reject) => {
stream.write(buf, (err) => {
// istanbul ignore next toohard
/* c8 ignore next 3 */
if (err) {
reject(err)
}
Expand Down
6 changes: 3 additions & 3 deletions lib/reader-writer-iface.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ const { Errors } = require('interface-datastore')
const { verifyRoots, cidToKey } = require('./util')

class Reader {
// istanbul ignore next
/* c8 ignore next 3 */
get () {
throw new Error('Unimplemented method')
}

// istanbul ignore next
/* c8 ignore next 3 */
has () {
throw new Error('Unimplemented method')
}

// istanbul ignore next
/* c8 ignore next 3 */
keys () {
throw new Error('Unimplemented method')
}
Expand Down
2 changes: 1 addition & 1 deletion lib/writer-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class StreamWriter extends Writer {
}

/* async */ setRoots (roots) {
// istanbul ignore next toohard
/* c8 ignore next 3 */
if (this._mutex) {
throw new Error('Roots already set or blocks are being written')
}
Expand Down
9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"test:browser": "npx polendina --cleanup test/test-readbuffer.js test/test-query.js",
"test:node": "hundreds mocha test/test-*.js",
"test": "npm run lint && npm run test:node && npm run test:browser",
"docs": "npx jsdoc4readme --readme *.js lib/raw.js"
"docs": "npx jsdoc4readme --readme *.js lib/raw.js",
"coverage": "c8 --reporter=html --reporter=text mocha test/test-*.js && npx st -d coverage -p 8888"
},
"repository": {
"type": "git",
Expand All @@ -32,13 +33,13 @@
"devDependencies": {
"bl": "^4.0.2",
"garbage": "0.0.0",
"hundreds": "0.0.2",
"mocha": "^7.1.1"
"hundreds": "0.0.7",
"mocha": "^7.2.0"
},
"dependencies": {
"@ipld/block": "^4.0.0",
"cids": "^0.8.0",
"interface-datastore": "^0.8.3",
"interface-datastore": "^1.0.4",
"multicodec": "^1.0.1",
"varint": "^5.0.0"
}
Expand Down
79 changes: 79 additions & 0 deletions test/test-complete-graph.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/* eslint-env mocha */

const assert = require('assert')
const { writeStream, readBuffer, completeGraph } = require('../')
const Block = require('@ipld/block')
const { PassThrough } = require('stream')

const same = assert.deepStrictEqual

function all (car) {
const _traverse = async function * (link, seen = new Set()) {
link = await link
seen.add(link.toString('base64'))
const encoded = await car.get(link)
const block = Block.create(encoded, link)
yield block
const cid = await block.cid()
if (cid.codec === 'raw') {
return
}

for (const [, link] of block.reader().links()) {
if (seen.has(link.toString('base64'))) {
continue
}
yield * _traverse(link, seen)
}
}

return _traverse(car.getRoots().then(([root]) => root))
}

async function createGet (blocks) {
const db = new Map()
for (const block of blocks) {
db.set((await block.cid()).toString('base64'), block)
}
return (cid) => new Promise((resolve) => resolve(db.get(cid.toString('base64'))))
}

async function concat (stream) {
const buffers = []
for await (const buffer of stream) {
buffers.push(buffer)
}
return Buffer.concat(buffers)
}

describe('Create car for full graph', () => {
it('small graph', async () => {
const leaf1 = Block.encoder({ hello: 'world' }, 'dag-cbor')
const leaf2 = Block.encoder({ test: 1 }, 'dag-cbor')
const raw = Block.encoder(Buffer.from('test'), 'raw')
const root = Block.encoder(
{
one: await leaf1.cid(),
two: await leaf2.cid(),
three: await leaf1.cid(),
zraw: await raw.cid()
},
'dag-cbor')
const expected = [root, leaf1, leaf2, raw]
const get = await createGet(expected)
const stream = new PassThrough()
const car = await writeStream(stream)
await completeGraph(await root.cid(), get, car)
const data = await concat(stream)

const reader = await readBuffer(data)
const [readRoot, ...more] = await reader.getRoots()
same(more.length, 0)
assert.ok(readRoot.equals(await root.cid()))

for await (const block of all(reader)) {
const expectedBlock = expected.shift()
assert.ok((await expectedBlock.cid()).equals(await block.cid()))
}
})
})
2 changes: 1 addition & 1 deletion test/test-raw.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ describe('Raw', () => {

it('read raw using index (FileHandle)', async () => {
const fd = await fs.promises.open(path.join(__dirname, 'go.car'))
verifyRead(fd)
await verifyRead(fd)
await fd.close(fd)
})

Expand Down