From 71f1aac954a7bda407e07ae454d449fb72db0f80 Mon Sep 17 00:00:00 2001 From: Zane Starr Date: Sun, 10 Feb 2019 15:48:37 -0800 Subject: [PATCH 1/8] Refactor js-repo away from callbacks leaning on async/await --- example.js | 16 +- package.json | 8 +- src/api-addr.js | 22 ++- src/blockstore.js | 169 ++++++++------------ src/config.js | 96 ++++-------- src/index.js | 330 +++++++++++++++------------------------- src/lock-memory.js | 24 +-- src/lock.js | 21 +-- src/spec.js | 29 ++-- src/version.js | 50 +++--- test/blockstore-test.js | 278 +++++++++++---------------------- test/datastore-test.js | 75 ++++----- test/interop-test.js | 40 ++--- test/lock-test.js | 42 ++--- test/node.js | 73 ++++----- test/options-test.js | 22 +-- test/repo-test.js | 126 +++++---------- test/stat-test.js | 54 +++---- 18 files changed, 525 insertions(+), 950 deletions(-) diff --git a/example.js b/example.js index 8239c353..9f26bdf8 100644 --- a/example.js +++ b/example.js @@ -3,16 +3,6 @@ const Repo = require('ipfs-repo') const repo = new Repo('/Users/awesome/.jsipfs') -repo.init({ my: 'config' }, (err) => { - if (err) { - throw err - } - - repo.open((err) => { - if (err) { - throw err - } - - console.log('repo is ready') - }) -}) +repo.init({ my: 'config' }) + .then(repo.open) + .then(() => console.log('repo is ready')) diff --git a/package.json b/package.json index ded9f0e1..e974b1b7 100644 --- a/package.json +++ b/package.json @@ -53,17 +53,15 @@ "rimraf": "^2.6.3" }, "dependencies": { - "async": "^2.6.2", "base32.js": "~0.1.0", "bignumber.js": "^8.1.1", "buffer": "^5.2.1", "cids": "~0.5.8", "datastore-core": "~0.6.0", - "datastore-fs": "~0.8.0", - "datastore-level": "~0.10.0", + "datastore-fs": "~0.7.0", + "datastore-level": "git://github.com/ipfs/js-datastore-level.git#refactor/async-iterators", "debug": "^4.1.0", - "dlv": "^1.1.2", - "interface-datastore": "~0.6.0", + "interface-datastore": "git://github.com/ipfs/interface-datastore.git#refactor/async-iterators", "ipfs-block": "~0.8.0", "just-safe-set": "^2.1.0", "multiaddr": "^6.0.6", diff --git a/src/api-addr.js b/src/api-addr.js index 6dfe2aff..dc863d60 100644 --- a/src/api-addr.js +++ b/src/api-addr.js @@ -9,30 +9,28 @@ module.exports = (store) => { /** * Get the current configuration from the repo. * - * @param {function(Error, Object)} callback - * @returns {void} + * @returns {Promise} */ - get (callback) { - store.get(apiFile, (err, value) => callback(err, value && value.toString())) + async get () { + const value = await store.get(apiFile) + return value && value.toString() }, /** * Set the current configuration for this repo. * * @param {Object} value - the api address to be written - * @param {function(Error)} callback - * @returns {void} + * @returns {Promise} */ - set (value, callback) { - store.put(apiFile, Buffer.from(value.toString()), callback) + set (value) { + return store.put(apiFile, Buffer.from(value.toString())) }, /** * Deletes api file * - * @param {function(Error, bool)} callback - * @returns {void} + * @returns {Promise} */ - delete (callback) { - store.delete(apiFile, callback) + delete () { + return store.delete(apiFile) } } } diff --git a/src/blockstore.js b/src/blockstore.js index e081f810..d4fd71e1 100644 --- a/src/blockstore.js +++ b/src/blockstore.js @@ -5,11 +5,7 @@ const ShardingStore = core.ShardingDatastore const Key = require('interface-datastore').Key const base32 = require('base32.js') const Block = require('ipfs-block') -const setImmediate = require('async/setImmediate') -const reject = require('async/reject') const CID = require('cids') -const pull = require('pull-stream/pull') -const collect = require('pull-stream/sinks/collect') /** * Transform a raw buffer to a base32 encoded key. @@ -32,21 +28,17 @@ const cidToDsKey = (cid) => { return keyFromBuffer(cid.buffer) } -module.exports = (filestore, options, callback) => { - maybeWithSharding(filestore, options, (err, store) => { - if (err) { return callback(err) } - - callback(null, createBaseStore(store)) - }) +module.exports = async (filestore, options) => { + const store = await maybeWithSharding(filestore, options) + return createBaseStore(store) } -function maybeWithSharding (filestore, options, callback) { +function maybeWithSharding (filestore, options) { if (options.sharding) { const shard = new core.shard.NextToLast(2) - ShardingStore.createOrOpen(filestore, shard, callback) - } else { - setImmediate(() => callback(null, filestore)) + return ShardingStore.createOrOpen(filestore, shard) } + return filestore } function createBaseStore (store) { @@ -55,142 +47,113 @@ function createBaseStore (store) { * Query the store. * * @param {object} query - * @param {function(Error, Array)} callback - * @return {void} + * @return {Iterable} */ - query (query, callback) { - pull( - store.query(query), - collect(callback) - ) + query (query) { + return store.query(query) }, /** * Get a single block by CID. * * @param {CID} cid - * @param {function(Error, Block)} callback - * @returns {void} + * @returns {Promise} */ - get (cid, callback) { + async get (cid) { if (!CID.isCID(cid)) { - return setImmediate(() => { - callback(new Error('Not a valid cid')) - }) + throw new Error('Not a valid cid') } - const key = cidToDsKey(cid) - store.get(key, (err, blockData) => { - if (err) { - // If not found, we try with the other CID version. - // If exists, then store that block under the CID that was requested. - // Some duplication occurs. - if (err.code === 'ERR_NOT_FOUND') { - const otherCid = cidToOtherVersion(cid) - if (!otherCid) return callback(err) - - const otherKey = cidToDsKey(otherCid) - return store.get(otherKey, (err, blockData) => { - if (err) return callback(err) - - store.put(key, blockData, (err) => { - if (err) return callback(err) - callback(null, new Block(blockData, cid)) - }) - }) - } - - return callback(err) + let blockData + try { + blockData = await store.get(key) + return new Block(blockData, cid) + } catch (err) { + if (err.code === 'ERR_NOT_FOUND') { + const otherCid = cidToOtherVersion(cid) + if (!otherCid) throw err + + const otherKey = cidToDsKey(otherCid) + const blockData = await store.get(otherKey) + await store.put(key, blockData) + return new Block(blockData, cid) } - - callback(null, new Block(blockData, cid)) - }) + } }, - put (block, callback) { + /** + * Write a single block to the store. + * + * @param {Block} block + * @returns {Promise} + */ + put (block) { if (!Block.isBlock(block)) { - return setImmediate(() => { - callback(new Error('invalid block')) - }) + throw new Error('invalid block') } const k = cidToDsKey(block.cid) - - store.has(k, (err, exists) => { - if (err) { return callback(err) } - if (exists) { return callback() } - - store.put(k, block.data, callback) + return store.has(k).then((exists) => { + if (exists) { return } + return store.put(k, block.data) }) }, + /** * Like put, but for more. * * @param {Array} blocks - * @param {function(Error)} callback - * @returns {void} + * @returns {Promise} */ - putMany (blocks, callback) { + async putMany (blocks) { const keys = blocks.map((b) => ({ key: cidToDsKey(b.cid), block: b })) - const batch = store.batch() - reject(keys, (k, cb) => store.has(k.key, cb), (err, newKeys) => { - if (err) { - return callback(err) - } - - newKeys.forEach((k) => { - batch.put(k.key, k.block.data) - }) - - batch.commit(callback) + const batch = await store.batch() + const newKeys = await Promise.all(keys.filter((k) => { store.has(k.key) })) + newKeys.forEach((k) => { + batch.put(k.key, k.block.data) }) + return batch.commit() }, /** * Does the store contain block with this cid? * * @param {CID} cid - * @param {function(Error, bool)} callback - * @returns {void} + * @returns {Promise} */ - has (cid, callback) { + has (cid) { if (!CID.isCID(cid)) { - return setImmediate(() => { - callback(new Error('Not a valid cid')) - }) + throw new Error('Not a valid cid') } - store.has(cidToDsKey(cid), (err, exists) => { - if (err) return callback(err) - if (exists) return callback(null, true) - - // If not found, we try with the other CID version. - const otherCid = cidToOtherVersion(cid) - if (!otherCid) return callback(null, false) - - store.has(cidToDsKey(otherCid), callback) - }) + return store.has(cidToDsKey(cid)) + .then((exists) => { + if (exists) return exists + const otherCid = cidToOtherVersion(cid) + if (!otherCid) return false + return store.has(cidToDsKey(otherCid)) + }) }, /** * Delete a block from the store * * @param {CID} cid - * @param {function(Error)} callback - * @returns {void} + * @returns {Promise} */ - delete (cid, callback) { + delete (cid) { if (!CID.isCID(cid)) { - return setImmediate(() => { - callback(new Error('Not a valid cid')) - }) + throw new Error('Not a valid cid') } - - store.delete(cidToDsKey(cid), callback) + return store.delete(cidToDsKey(cid)) }, - - close (callback) { - store.close(callback) + /** + * Close the store + * + * @returns {Promise} + */ + close () { + return store.close() } } } diff --git a/src/config.js b/src/config.js index c5840a05..ce9541a2 100644 --- a/src/config.js +++ b/src/config.js @@ -2,120 +2,84 @@ const Key = require('interface-datastore').Key const queue = require('async/queue') -const waterfall = require('async/waterfall') -const _get = require('dlv') -const _set = require('just-safe-set') -const Buffer = require('buffer').Buffer +const _get = require('lodash.get') +const _set = require('lodash.set') +const _has = require('lodash.has') +const Buffer = require('safe-buffer').Buffer const configKey = new Key('config') module.exports = (store) => { const setQueue = queue(_doSet, 1) + setQueue.error = (err) => { throw err } const configStore = { /** * Get the current configuration from the repo. * * @param {String} key - the config key to get - * @param {function(Error, Object)} callback - * @returns {void} + * @returns {Promise} */ - get (key, callback) { - if (typeof key === 'function') { - callback = key - key = undefined - } + get (key) { if (!key) { key = undefined } - store.get(configKey, (err, encodedValue) => { - if (err) { return callback(err) } - - let config - try { - config = JSON.parse(encodedValue.toString()) - } catch (err) { - return callback(err) - } - - if (typeof key === 'undefined') { - return callback(null, config) - } - - if (typeof key !== 'string') { - return callback(new Error('Key ' + key + ' must be a string.')) - } - - const value = _get(config, key, null) - - if (value === null) { - return callback(new Error('Key ' + key + ' does not exist in config.')) - } - - callback(null, value) - }) + return store.get(configKey) + .then((encodedValue) => { + const config = JSON.parse(encodedValue.toString()) + if (key !== undefined && !_has(config, key)) { + throw new Error(`Key ${key} does not exist in config`) + } + const value = key !== undefined ? _get(config, key) : config + return value + }) }, /** * Set the current configuration for this repo. * * @param {String} key - the config key to be written * @param {Object} value - the config value to be written - * @param {function(Error)} callback * @returns {void} */ - set (key, value, callback) { - if (typeof value === 'function') { - callback = value - value = key - key = undefined - } else if (!key || typeof key !== 'string') { - return callback(new Error('Invalid key type')) + set (key, value) { + if (!key || typeof key !== 'string') { + throw new Error('Invalid key type') } if (value === undefined || Buffer.isBuffer(value)) { - return callback(new Error('Invalid value type')) + throw new Error('Invalid value type') } setQueue.push({ key: key, value: value - }, callback) + }) }, /** * Check if a config file exists. * - * @param {function(Error, bool)} callback - * @returns {void} + * @returns {Promise} */ - exists (callback) { - store.has(configKey, callback) + exists () { + return store.has(configKey) } } return configStore - function _doSet (m, callback) { + async function _doSet (m) { const key = m.key const value = m.value if (key) { - waterfall( - [ - (cb) => configStore.get(cb), - (config, cb) => { - _set(config, key, value) - cb(null, config) - }, - _saveAll - ], - callback) - } else { - _saveAll(value, callback) + const config = await configStore.get() + _set(config, key, value) + await _saveAll(config) } } - function _saveAll (config, callback) { + function _saveAll (config) { const buf = Buffer.from(JSON.stringify(config, null, 2)) - store.put(configKey, buf, callback) + return store.put(configKey, buf) } } diff --git a/src/index.js b/src/index.js index 2e018ded..717814c1 100644 --- a/src/index.js +++ b/src/index.js @@ -1,16 +1,10 @@ 'use strict' -const waterfall = require('async/waterfall') -const series = require('async/series') -const parallel = require('async/parallel') -const each = require('async/each') -const _get = require('dlv') +const _get = require('lodash.get') const assert = require('assert') const path = require('path') const debug = require('debug') const Big = require('bignumber.js') -const pull = require('pull-stream/pull') -const reduce = require('pull-stream/sinks/reduce') const backends = require('./backends') const version = require('./version') @@ -22,6 +16,7 @@ const defaultOptions = require('./default-options') const defaultDatastore = require('./default-datastore') const ERRORS = require('./errors') + const log = debug('repo') const noLimit = Number.MAX_SAFE_INTEGER @@ -62,83 +57,54 @@ class IpfsRepo { * Initialize a new repo. * * @param {Object} config - config to write into `config`. - * @param {function(Error)} callback - * @returns {void} + * @returns {Promise} */ - init (config, callback) { + init (config) { log('initializing at: %s', this.path) - - series([ - (cb) => this.root.open(ignoringAlreadyOpened(cb)), - (cb) => this.config.set(buildConfig(config), cb), - (cb) => this.spec.set(buildDatastoreSpec(config), cb), - (cb) => this.version.set(repoVersion, cb) - ], callback) + return this.root.open() + .then(() => this.config.set(buildConfig(config))) + .then(() => this.spec.set(buildDatastoreSpec(config))) + .then(() => this.version.set(repoVersion)) } /** * Open the repo. If the repo is already open no action will be taken. * If the repo is not initialized it will return an error. * - * @param {function(Error)} callback - * @returns {void} + * @returns {Promise} */ - open (callback) { + async open () { if (!this.closed) { - setImmediate(() => callback(new Error('repo is already open'))) - return // early + throw new Error('repo is already open') } log('opening at: %s', this.path) // check if the repo is already initialized - waterfall([ - (cb) => this.root.open(ignoringAlreadyOpened(cb)), - (cb) => this._isInitialized(cb), - (cb) => this._openLock(this.path, cb), - (lck, cb) => { - log('aquired repo.lock') - this.lockfile = lck - cb() - }, - (cb) => { - log('creating datastore') - this.datastore = backends.create('datastore', path.join(this.path, 'datastore'), this.options) - log('creating blocks') - const blocksBaseStore = backends.create('blocks', path.join(this.path, 'blocks'), this.options) - blockstore( - blocksBaseStore, - this.options.storageBackendOptions.blocks, - cb) - }, - (blocks, cb) => { - this.blocks = blocks - cb() - }, - (cb) => { - log('creating keystore') - this.keys = backends.create('keys', path.join(this.path, 'keys'), this.options) - cb() - }, - - (cb) => { - this.closed = false - log('all opened') - cb() - } - ], (err) => { + try { + await this.root.open() + await this._isInitialized() + this.lockfile = await this._openLock(this.path) + log('aquired repo.lock') + log('creating datastore') + this.datastore = backends.create('datastore', path.join(this.path, 'datastore'), this.options) + log('creating blocks') + const blocksBaseStore = backends.create('blocks', path.join(this.path, 'blocks'), this.options) + this.blocks = await blockstore(blocksBaseStore, this.options.storageBackendOptions.blocks) + log('creating keystore') + this.keys = backends.create('keys', path.join(this.path, 'keys'), this.options) + this.closed = false + log('all opened') + } catch (err) { if (err && this.lockfile) { - this._closeLock((err2) => { - if (!err2) { - this.lockfile = null - } else { - log('error removing lock', err2) - } - callback(err) - }) - } else { - callback(err) + try { + this._closeLock() + } catch (err2) { + log('error removing lock', err2) + } + this.lockfile = null + throw err } - }) + } } /** @@ -162,102 +128,78 @@ class IpfsRepo { * be returned in the callback if one has been created. * * @param {string} path - * @param {function(Error, lockfile)} callback - * @returns {void} + * @returns {Promise} */ - _openLock (path, callback) { - this._locker.lock(path, (err, lockfile) => { - if (err) { - return callback(err, null) - } - - assert.strictEqual(typeof lockfile.close, 'function', 'Locks must have a close method') - callback(null, lockfile) - }) + async _openLock (path) { + const lockfile = await this._locker.lock(path) + assert.strictEqual(typeof lockfile.close, 'function', 'Locks must have a close method') + return lockfile } /** * Closes the lock on the repo * - * @param {function(Error)} callback - * @returns {void} + * @returns {Promise} */ - _closeLock (callback) { + _closeLock () { if (this.lockfile) { - return this.lockfile.close(callback) + return this.lockfile.close() } - callback() } /** * Check if the repo is already initialized. - * * @private - * @param {function(Error)} callback - * @returns {void} + * @returns {Promise} */ - _isInitialized (callback) { + async _isInitialized () { log('init check') - parallel( - { - config: (cb) => this.config.exists(cb), - spec: (cb) => this.spec.exists(cb), - version: (cb) => this.version.check(repoVersion, cb) - }, - (err, res) => { - log('init', err, res) - if (err && !res.config) { - return callback(Object.assign(new Error('repo is not initialized yet'), - { - code: ERRORS.ERR_REPO_NOT_INITIALIZED, - path: this.path - })) - } - callback(err) + let res + let config, spec, version + try { + [config, spec, version] = await Promise.all([this.config.exists(), this.spec.exists(), this.version.check(repoVersion)]) + res = { + config: config, + spec: spec, + version: version + } + } catch (err) { + if (err && !res.config) { + throw Object.assign(new Error('repo is not initialized yet'), + { + code: ERRORS.ERR_REPO_NOT_INITIALIZED, + path: this.path + }) } - ) + throw err + } } /** * Close the repo and cleanup. * - * @param {function(Error)} callback - * @returns {void} + * @returns {Promise} */ - close (callback) { + async close () { if (this.closed) { - return callback(new Error('repo is already closed')) + throw new Error('repo is already closed') } - log('closing at: %s', this.path) - series([ - (cb) => this.apiAddr.delete(ignoringNotFound(cb)), - (cb) => { - each( - [this.blocks, this.keys, this.datastore], - (store, callback) => store.close(callback), - cb) - }, - (cb) => { - log('unlocking') - this.closed = true - this._closeLock(cb) - }, - (cb) => { - this.lockfile = null - cb() - } - ], (err) => callback(err)) + await this.apiAddr.delete() + await Promise.all([this.blocks, this.keys, this.datastore].map((store) => store.close())) + log('unlocking') + this.closed = true + await this._closeLock() + this.lockfile = null } /** * Check if a repo exists. * - * @param {function(Error, bool)} callback - * @returns {void} + * @returns {Promise} */ - exists (callback) { - this.version.exists(callback) + exists () { + return this.version.exists() } /** @@ -265,96 +207,70 @@ class IpfsRepo { * * @param {Object} options * @param {Boolean} options.human - * @param {function(Error, Object)} callback - * @return {void} + * @return {Object} */ - stat (options, callback) { - if (typeof options === 'function') { - callback = options - options = {} + async stat (options) { + options = Object.assign({}, { human: false }, options) + let storageMax, blocks, version, datastore, keys + [storageMax, blocks, version, datastore, keys] = await Promise.all([ + this._storageMaxStat(), + this._blockStat(), + this.version.get(), + getSize(this.datastore), + getSize(this.keys) + ]) + let size = blocks.size + .plus(datastore) + .plus(keys) + + if (options.human) { + size = size.div(1048576) } + return { + repoPath: this.path, + storageMax: storageMax, + version: version, + numObjects: blocks.count, + repoSize: size + } + } - options = Object.assign({}, { human: false }, options) + _storageMaxStat () { + return this.config.get('Datastore.StorageMax') + .then((max) => new Big(max)) + .catch(() => new Big(noLimit)) + } - parallel({ - storageMax: (cb) => this.config.get('Datastore.StorageMax', (err, max) => { - if (err) { - cb(null, new Big(noLimit)) - } else { - cb(null, new Big(max)) - } - }), - version: (cb) => this.version.get(cb), - blocks: (cb) => this.blocks.query({}, (err, list) => { - list = list || [] - - const count = new Big(list.length) - let size = new Big(0) - - list.forEach(block => { - size = size - .plus(block.value.byteLength) - .plus(block.key._buf.byteLength) - }) - - cb(err, { - count: count, - size: size - }) - }), - datastore: (cb) => getSize(this.datastore, cb), - keys: (cb) => getSize(this.keys, cb) - }, (err, results) => { - if (err) return callback(err) - - let size = results.blocks.size - .plus(results.datastore) - .plus(results.keys) - - if (options.human) { - size = size.div(1048576) - } + async _blockStat () { + const list = [] + for await (const block of this.blocks.query({})) { + list.push(block) + } + const count = new Big(list.length) + let size = new Big(0) - callback(null, { - repoPath: this.path, - storageMax: results.storageMax, - version: results.version, - numObjects: results.blocks.count, - repoSize: size - }) + list.forEach(block => { + size = size + .plus(block.value.byteLength) + .plus(block.key._buf.byteLength) }) + return { count: count, size: size } } } -function getSize (queryFn, callback) { - pull( - queryFn.query({}), - reduce((sum, block) => { - return sum - .plus(block.value.byteLength) - .plus(block.key._buf.byteLength) - }, new Big(0), callback)) +async function getSize (queryFn) { + let sum = new Big(0) + for await (const block of queryFn.query({})) { + sum.plus(block.value.byteLength) + .plus(block.key._buf.byteLength) + } + return sum } module.exports = IpfsRepo module.exports.repoVersion = repoVersion module.exports.errors = ERRORS -function ignoringIf (cond, cb) { - return (err) => { - cb(err && !cond(err) ? err : null) - } -} -function ignoringAlreadyOpened (cb) { - return ignoringIf((err) => err.message === 'Already open', cb) -} - -function ignoringNotFound (cb) { - return ignoringIf((err) => { - return err && (err.code === ERRORS.ERR_REPO_NOT_INITIALIZED || err.message.startsWith('ENOENT')) - }, cb) -} - function buildOptions (_options) { const options = Object.assign({}, defaultOptions, _options) diff --git a/src/lock-memory.js b/src/lock-memory.js index 9b6c2937..3e13aa26 100644 --- a/src/lock-memory.js +++ b/src/lock-memory.js @@ -1,7 +1,6 @@ 'use strict' const debug = require('debug') -const setImmediate = require('async/setImmediate') const log = debug('repo:lock') @@ -11,41 +10,34 @@ const LOCKS = {} /** * Lock the repo in the given dir. - * + * TODO * @param {string} dir - * @param {function(Error, lock)} callback - * @returns {void} + * @returns {Promise} */ -exports.lock = (dir, callback) => { +exports.lock = (dir) => { const file = dir + '/' + lockFile log('locking %s', file) LOCKS[file] = true const closer = { - close (cb) { + close () { if (LOCKS[file]) { delete LOCKS[file] } - setImmediate(cb) } } - setImmediate(() => { - callback(null, closer) - }) + return closer } /** * Check if the repo in the given directory is locked. * * @param {string} dir - * @param {function(Error, bool)} callback - * @returns {void} + * @returns {bool} */ -exports.locked = (dir, callback) => { +exports.locked = (dir) => { const file = dir + '/' + lockFile log('checking lock: %s') const locked = LOCKS[file] - setImmediate(() => { - callback(null, locked) - }) + return locked } diff --git a/src/lock.js b/src/lock.js index b3dbac17..0e7a26de 100644 --- a/src/lock.js +++ b/src/lock.js @@ -22,24 +22,11 @@ const STALE_TIME = 20000 * Lock the repo in the given dir. * * @param {string} dir - * @param {function(Error, lock)} callback - * @returns {void} + * @returns {Object} */ -exports.lock = (dir, callback) => { +exports.lock = async (dir) => { const file = path.join(dir, lockFile) log('locking %s', file) - - lock(dir, { lockfilePath: file, stale: STALE_TIME }) - .then(release => { - callback(null, { - close: (cb) => { - release() - .then(() => cb()) - .catch(err => cb(err)) - } - }) - }, callback) - .catch(err => { - log(err) - }) + const release = await lock(dir, { lockfilePath: file, stale: STALE_TIME }) + return { close: () => release() } } diff --git a/src/spec.js b/src/spec.js index 8cb165f9..26cf880e 100644 --- a/src/spec.js +++ b/src/spec.js @@ -10,35 +10,28 @@ module.exports = (store) => { /** * Check if a datastore spec file exists. * - * @param {function(Error, bool)} callback - * @returns {void} + * @returns {Promise} */ - exists (callback) { - store.has(specKey, callback) + exists () { + return store.has(specKey) }, /** * Get the current datastore spec. * - * @param {function(Error, number)} callback - * @returns {void} + * @returns {Promise} */ - get (callback) { - store.get(specKey, (err, buf) => { - if (err) { - return callback(err) - } - callback(null, JSON.parse(buf.toString())) - }) + get () { + return store.get() + .then(buf => JSON.parse(buf.toString())) }, /** * Set the datastore spec of the repo, writing it to the underlying store. - * + * TODO unclear on what the type should be or if it's required * @param {number} spec - * @param {function(Error)} callback - * @returns {void} + * @returns {Promise} */ - set (spec, callback) { - store.put(specKey, Buffer.from(JSON.stringify(sortKeys(spec, { deep: true }))), callback) + set (spec) { + return store.put(specKey, Buffer.from(JSON.stringify(sortKeys(spec, { deep: true })))) } } } diff --git a/src/version.js b/src/version.js index 154112af..1385bdea 100644 --- a/src/version.js +++ b/src/version.js @@ -11,58 +11,44 @@ module.exports = (store) => { /** * Check if a version file exists. * - * @param {function(Error, bool)} callback - * @returns {void} + * @returns {Promise} */ - exists (callback) { - store.has(versionKey, callback) + exists () { + return store.has(versionKey) }, /** * Get the current version. * - * @param {function(Error, number)} callback - * @returns {void} + * @returns {Promise} */ - get (callback) { - store.get(versionKey, (err, buf) => { - if (err) { - return callback(err) - } - callback(null, parseInt(buf.toString().trim(), 10)) - }) + get () { + return this.get(versionKey) + .then(buf => parseInt(buf.toString().trim(), 10)) }, /** * Set the version of the repo, writing it to the underlying store. * * @param {number} version - * @param {function(Error)} callback * @returns {void} */ - set (version, callback) { - store.put(versionKey, Buffer.from(String(version)), callback) + set (version) { + return store.put(versionKey, Buffer.from(String(version))) }, /** * Check the current version, and return an error on missmatch * @param {number} expected - * @param {function(Error)} callback * @returns {void} */ - check (expected, callback) { - this.get((err, version) => { - if (err) { - return callback(err) - } - log('comparing version: %s and %s', version, expected) - - // Version 6 and 7 are the same - // TODO: Clean up the compatibility logic. Repo feature detection would be ideal, or a better version schema - const compatibleVersion = (version === 6 && expected === 7) || (expected === 6 && version === 7) + async check (expected) { + const version = await this.get() + log('comparing version: %s and %s', version, expected) + // Version 6 and 7 are the same + // TODO: Clean up the compatibility logic. Repo feature detection would be ideal, or a better version schema + const compatibleVersion = (version === 6 && expected === 7) || (expected === 6 && version === 7) - if (version !== expected && !compatibleVersion) { - return callback(new Error(`ipfs repo needs migration: expected version v${expected}, found version v${version}`)) - } - callback() - }) + if (version !== expected && !compatibleVersion) { + throw new Error(`ipfs repo needs migration: expected version v${expected}, found version v${version}`) + } } } } diff --git a/test/blockstore-test.js b/test/blockstore-test.js index a504d925..b39c8122 100644 --- a/test/blockstore-test.js +++ b/test/blockstore-test.js @@ -5,12 +5,9 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect +const assert = chai.assert const Block = require('ipfs-block') const CID = require('cids') -const parallel = require('async/parallel') -const waterfall = require('async/waterfall') -const each = require('async/each') -const map = require('async/map') const _ = require('lodash') const multihashing = require('multihashing-async') @@ -20,230 +17,141 @@ module.exports = (repo) => { const bData = Buffer.from('hello world') let b - before((done) => { - multihashing(bData, 'sha2-256', (err, h) => { - if (err) { - return done(err) - } - - b = new Block(bData, new CID(h)) - done() - }) + before(async () => { + const hash = await multihashing(bData, 'sha2-256') + b = new Block(bData, new CID(hash)) }) describe('.put', () => { - it('simple', (done) => { - repo.blocks.put(b, done) + it('simple', async () => { + await repo.blocks.put(b) }) - it('multi write (locks)', (done) => { - parallel([ - (cb) => repo.blocks.put(b, cb), - (cb) => repo.blocks.put(b, cb) - ], done) + it('multi write (locks)', async () => { + await Promise.all([repo.blocks.put(b), repo.blocks.put(b)]) }) - it('empty value', (done) => { + it('empty value', async () => { const d = Buffer.alloc(0) - multihashing(d, 'sha2-256', (err, multihash) => { - expect(err).to.not.exist() - const empty = new Block(d, new CID(multihash)) - repo.blocks.put(empty, done) - }) + const multihash = await multihashing(d, 'sha2-256') + const empty = new Block(d, new CID(multihash)) + await repo.blocks.put(empty) }) - it('massive multiwrite', function (done) { + it('massive multiwrite', async () => { this.timeout(15000) // add time for ci - waterfall([ - (cb) => map(_.range(100), (i, cb) => { - multihashing(blockData[i], 'sha2-256', cb) - }, cb), - (hashes, cb) => each(_.range(100), (i, cb) => { - const block = new Block(blockData[i], new CID(hashes[i])) - repo.blocks.put(block, cb) - }, cb) - ], done) + const hashes = await Promise.all(_.range(100).map((i) => multihashing(blockData[i], 'sha2-256'))) + await Promise.all(_.range(100).map((i) => { + const block = new Block(blockData[i], new CID(hashes[i])) + return repo.blocks.put(block) + })) }) - it('.putMany', function (done) { + it('.putMany', async () => { this.timeout(15000) // add time for ci - waterfall([ - (cb) => map(_.range(50), (i, cb) => { - const d = Buffer.from('many' + Math.random()) - multihashing(d, 'sha2-256', (err, hash) => { - if (err) { - return cb(err) - } - cb(null, new Block(d, new CID(hash))) - }) - }, cb), - (blocks, cb) => { - repo.blocks.putMany(blocks, (err) => { - expect(err).to.not.exist() - map(blocks, (b, cb) => { - repo.blocks.get(b.cid, cb) - }, (err, res) => { - expect(err).to.not.exist() - expect(res).to.be.eql(blocks) - cb() - }) - }) - } - ], done) + const blocks = await Promise.all(_.range(50).map(async (i) => { + const d = Buffer.from('many' + Math.random()) + const hash = await multihashing(d, 'sha2-256') + return new Block(d, new CID(hash)) + })) + await repo.blocks.putMany(blocks) + blocks.each(async (block) => { + const block1 = await repo.blocks.get(block.cid) + expect(block1).to.be.eql(block) + }) }) - it('returns an error on invalid block', (done) => { - repo.blocks.put('hello', (err) => { + it('returns an error on invalid block', async () => { + try { + await repo.blocks.put('hello') + assert.fail() + } catch (err) { expect(err).to.exist() - done() - }) + } }) }) describe('.get', () => { - it('simple', (done) => { - repo.blocks.get(b.cid, (err, block) => { - expect(err).to.not.exist() - expect(block).to.be.eql(b) - done() - }) - }) - - it('massive read', function (done) { - this.timeout(15000) // add time for ci - parallel(_.range(20 * 100).map((i) => (cb) => { - const j = i % blockData.length - waterfall([ - (cb) => multihashing(blockData[j], 'sha2-256', cb), - (h, cb) => { - const cid = new CID(h) - repo.blocks.get(cid, cb) - }, - (block, cb) => { - expect(block.data).to.be.eql(blockData[j]) - cb() - } - ], cb) - }), done) - }) - - it('returns an error on invalid block', (done) => { - repo.blocks.get('woot', (err, val) => { - expect(err).to.exist() - expect(val).to.not.exist() - done() - }) - }) - - it('should get block stored under v0 CID with a v1 CID', done => { - const data = Buffer.from(`TEST${Date.now()}`) - - multihashing(data, 'sha2-256', (err, hash) => { - if (err) return done(err) - - const cid = new CID(hash) - - repo.blocks.put(new Block(data, cid), err => { - if (err) return done(err) - - repo.blocks.get(cid.toV1(), (err, block) => { - expect(err).to.not.exist() - expect(block.data).to.eql(data) - done() - }) - }) - }) + it('simple', async () => { + const block = await repo.blocks.get(b.cid) + expect(block).to.be.eql(b) }) + }) - it('should get block stored under v1 CID with a v0 CID', done => { - const data = Buffer.from(`TEST${Date.now()}`) + it('massive read', async function () { + this.timeout(15000) // add time for ci + await Promise.all(_.range(20 * 100).map(async (i) => { + const j = i % blockData.length + const hash = await multihashing(blockData[j], 'sha2-256') + const block = await repo.blocks.get(new CID(hash)) + block.to.be.eql(blockData[j]) + })) + }) - multihashing(data, 'sha2-256', (err, hash) => { - if (err) return done(err) + it('returns an error on invalid block', async () => { + try { + await repo.blocks.get('woot') + } catch (err) { + expect(err).to.exist() + } + assert.fail() + }) - const cid = new CID(1, 'dag-pb', hash) + it('should get block stored under v0 CID with a v1 CID', async () => { + const data = Buffer.from(`TEST${Date.now()}`) + const hash = await multihashing(data, 'sha2-256') + const cid = new CID(hash) + await repo.blocks.put(new Block(data, cid)) + const block = await repo.blocks.get(cid.toV1()) + expect(block.data).to.eql(data) + }) - repo.blocks.put(new Block(data, cid), err => { - if (err) return done(err) + it('should get block stored under v1 CID with a v0 CID', async () => { + const data = Buffer.from(`TEST${Date.now()}`) - repo.blocks.get(cid.toV0(), (err, block) => { - expect(err).to.not.exist() - expect(block.data).to.eql(data) - done() - }) - }) - }) - }) + const hash = await multihashing(data, 'sha2-256') + const cid = new CID(1, 'dag-pb', hash) + await repo.blocks.put(new Block(data, cid)) + const block = await repo.blocks.get(cid.toV0()) + expect(block.data).to.empty(data) }) describe('.has', () => { - it('existing block', (done) => { - repo.blocks.has(b.cid, (err, exists) => { - expect(err).to.not.exist() - expect(exists).to.eql(true) - done() - }) + it('existing block', async () => { + const exists = await repo.blocks.has(b.cid) + expect(exists).to.eql(true) }) - it('non existent block', (done) => { - repo.blocks.has(new CID('QmbcpFjzamCj5ZZdduW32ctWUPvbGMwQZk2ghWK6PrKswE'), (err, exists) => { - expect(err).to.not.exist() - expect(exists).to.eql(false) - done() - }) + it('non existent block', async () => { + const exists = await repo.blocks.has(new CID('QmbcpFjzamCj5ZZdduW32ctWUPvbGMwQZk2ghWK6PrKswE')) + expect(exists).to.eql(false) }) - it('should have block stored under v0 CID with a v1 CID', done => { + it('should have block stored under v0 CID with a v1 CID', async () => { const data = Buffer.from(`TEST${Date.now()}`) - - multihashing(data, 'sha2-256', (err, hash) => { - if (err) return done(err) - - const cid = new CID(hash) - - repo.blocks.put(new Block(data, cid), err => { - if (err) return done(err) - - repo.blocks.has(cid.toV1(), (err, exists) => { - expect(err).to.not.exist() - expect(exists).to.eql(true) - done() - }) - }) - }) + const hash = await multihashing(data, 'sha2-256') + const cid = new CID(hash) + await repo.blocks.put(new Block(data, cid)) + const exists = await repo.blocks.has(cid.toV1()) + expect(exists).to.eql(true) }) - it('should have block stored under v1 CID with a v0 CID', done => { + it('should have block stored under v1 CID with a v0 CID', async () => { const data = Buffer.from(`TEST${Date.now()}`) - multihashing(data, 'sha2-256', (err, hash) => { - if (err) return done(err) - - const cid = new CID(1, 'dag-pb', hash) - - repo.blocks.put(new Block(data, cid), err => { - if (err) return done(err) - - repo.blocks.has(cid.toV0(), (err, exists) => { - expect(err).to.not.exist() - expect(exists).to.eql(true) - done() - }) - }) - }) + const hash = await multihashing(data, 'sha2-256') + const cid = new CID(1, 'dag-pb', hash) + await repo.blocks.put(new Block(data, cid)) + const exists = await repo.blocks.has(cid.toV0()) + expect(exists).to.eql(true) }) }) describe('.delete', () => { - it('simple', (done) => { - waterfall([ - (cb) => repo.blocks.delete(b.cid, cb), - (cb) => repo.blocks.has(b.cid, cb) - ], (err, exists) => { - expect(err).to.not.exist() - expect(exists).to.equal(false) - done() - }) + it('simple', async () => { + await repo.blocks.delete(b.cid) + const exists = await repo.blocks.has(b.cid) + expect(exists).to.equal(false) }) }) }) diff --git a/test/datastore-test.js b/test/datastore-test.js index b7fb8c26..e85296c7 100644 --- a/test/datastore-test.js +++ b/test/datastore-test.js @@ -5,9 +5,6 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const parallel = require('async/parallel') -const waterfall = require('async/waterfall') -const each = require('async/each') const _ = require('lodash') const Key = require('interface-datastore').Key @@ -18,75 +15,55 @@ module.exports = (repo) => { const b = new Key('hello') describe('.put', () => { - it('simple', (done) => { - repo.datastore.put(b, data, done) + it('simple', async () => { + await repo.datastore.put(b, data) }) - it('multi write (locks)', (done) => { - parallel([ - (cb) => repo.datastore.put(b, data, cb), - (cb) => repo.datastore.put(b, data, cb) - ], done) + it('multi write (locks)', async () => { + await Promise.all([repo.datastore.put(b, data), repo.datastore.put(b, data)]) }) - it('massive multiwrite', function (done) { + it('massive multiwrite', async function () { this.timeout(15000) // add time for ci - each(_.range(100), (i, cb) => { - repo.datastore.put(new Key('hello' + i), dataList[i], cb) - }, done) + await Promise.all(_.range(100).map((i) => { + return repo.datastore.put(new Key('hello' + i), dataList[i]) + })) }) }) describe('.get', () => { - it('simple', (done) => { - repo.datastore.get(b, (err, val) => { - expect(err).to.not.exist() - expect(val).to.be.eql(data) - done() - }) + it('simple', async () => { + const val = await repo.datastore.get(b) + expect(val).to.be.eql(data) }) - it('massive read', function (done) { + it('massive read', async function () { this.timeout(15000) // add time for ci - parallel(_.range(20 * 100).map((i) => (cb) => { + await Promise.all(_.range(20 * 100).map((i) => { const j = i % dataList.length - repo.datastore.get(new Key('hello' + j), (err, val) => { - expect(err).to.not.exist() - expect(val).to.be.eql(dataList[j]) - cb() - }) - }), done) + return repo.datastore.get(new Key('hello' + j)) + .then(val => expect(val).to.be.eql(dataList[j])) + })) }).timeout(10 * 1000) }) describe('.has', () => { - it('existing entry', (done) => { - repo.datastore.has(b, (err, exists) => { - expect(err).to.not.exist() - expect(exists).to.eql(true) - done() - }) + it('existing entry', async () => { + const exists = await repo.datastore.has(b) + expect(exists).to.eql(true) }) - it('non existent block', (done) => { - repo.datastore.has(new Key('world'), (err, exists) => { - expect(err).to.not.exist() - expect(exists).to.eql(false) - done() - }) + it('non existent block', async () => { + const exists = await repo.datastore.has(new Key('world')) + expect(exists).to.eql(false) }) }) describe('.delete', () => { - it('simple', (done) => { - waterfall([ - (cb) => repo.datastore.delete(b, cb), - (cb) => repo.datastore.has(b, cb) - ], (err, exists) => { - expect(err).to.not.exist() - expect(exists).to.equal(false) - done() - }) + it('simple', async () => { + await repo.datastore.delete(b) + const exists = await repo.datastore.has(b) + expect(exists).to.equal(false) }) }) }) diff --git a/test/interop-test.js b/test/interop-test.js index 05920c10..7a938b9d 100644 --- a/test/interop-test.js +++ b/test/interop-test.js @@ -7,50 +7,38 @@ const expect = chai.expect const mh = require('multihashes') const CID = require('cids') const Key = require('interface-datastore').Key -const map = require('async/map') module.exports = (repo) => { describe('interop', () => { - it('reads welcome-to-ipfs', (done) => { + it('reads welcome-to-ipfs', async () => { const welcomeHash = mh.fromHexString( '1220120f6af601d46e10b2d2e11ed71c55d25f3042c22501e41d1246e7a1e9d3d8ec' ) - repo.blocks.get(new CID(welcomeHash), (err, val) => { - expect(err).to.not.exist() - expect(val.data.toString()).to.match(/Hello and Welcome to IPFS/) - done() - }) + const val = await repo.blocks.get(new CID(welcomeHash)) + expect(val.data.toString()).to.match(/Hello and Welcome to IPFS/) }) - it('reads a bunch of blocks', (done) => { + it('reads a bunch of blocks', async () => { const cids = [ 'QmUxpzJGJYTK5AzH36jV9ucM2WdF5KhjANb4FAhqnREzuC', 'QmQbb26h9dcU5iNPMNEzYZnZN9YLTXBtFwuHmmo6YU4Aig' ].map((hash) => new CID(mh.fromB58String(hash))) - - map(cids, repo.blocks.get, (err, values) => { - expect(err).to.not.exist() - expect(values.length).to.equal(2) - expect(values.map((value) => value.data.length)).to.eql([2659, 12783]) - done() + const values = await Promise.all(cids.map(cid => repo.blocks.get(cid))) + values.forEach((value) => { + expect(value.length).to.equal(2) + expect(value.map(val => val.data.length)).to.eql([2659, 12783]) }) }) - it('reads pin set from the datastore', (done) => { - repo.datastore.get(new Key('/local/pins'), (err, val) => { - expect(err).to.not.exist() - expect(mh.toB58String(val)).to.equal('QmYAuyf2LzMba65NnhxLtGJxixKNUev9qYSu4MYM88hdwK') - done() - }) + it('reads pin set from the datastore', async () => { + const val = await repo.datastore.get(new Key('/local/pins')) + expect(mh.toB58String(val)).to.equal('QmYAuyf2LzMba65NnhxLtGJxixKNUev9qYSu4MYM88hdwK') }) - it('reads DHT records from the datastore', (done) => { - repo.datastore.get(new Key('/AHE5I5B7TY'), (err, val) => { - expect(err).to.not.exist() - expect(val.toString('hex')).to.eql('0a0601c9d4743f9e12097465737476616c75651a2212201d22e2a5e140e5cd20d88fc59cd560f4887c7d9acf938ddb24d7207eac40fd2f') - done() - }) + it('reads DHT records from the datastore', async () => { + const val = await repo.datastore.get(new Key('/AHE5I5B7TY')) + expect(val.toString('hex')).to.eql('0a0601c9d4743f9e12097465737476616c75651a2212201d22e2a5e140e5cd20d88fc59cd560f4887c7d9acf938ddb24d7207eac40fd2f') }) }) } diff --git a/test/lock-test.js b/test/lock-test.js index 84d3e485..60ec2163 100644 --- a/test/lock-test.js +++ b/test/lock-test.js @@ -4,28 +4,17 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const series = require('async/series') const IPFSRepo = require('../') module.exports = (repo) => { describe('Repo lock tests', () => { - it('should handle locking for a repo lifecycle', (done) => { + it('should handle locking for a repo lifecycle', async () => { expect(repo.lockfile).to.not.equal(null) - series([ - (cb) => { - repo.close(cb) - }, - (cb) => { - expect(repo.lockfile).to.equal(null) - cb() - }, - (cb) => { - repo.open(cb) - } - ], done) + await repo.close() + await repo.open() }) - it('should prevent multiple repos from using the same path', (done) => { + it('should prevent multiple repos from using the same path', async () => { const repoClone = new IPFSRepo(repo.path, repo.options) // Levelup throws an uncaughtException when a lock already exists, catch it @@ -35,29 +24,18 @@ module.exports = (repo) => { expect(err.message).to.match(/already held|IO error|already being hold/) }) - series([ - (cb) => { - try { - repoClone.init({}, cb) - } catch (err) { - cb(err) - } - }, - (cb) => { - repoClone.open(cb) - } - ], function (err) { - // There will be no listeners if the uncaughtException was triggered + try { + await repoClone.init({}) + await repoClone.open() + } catch (err) { if (process.listeners('uncaughtException').length > 0) { expect(err.message).to.match(/already locked|already held|already being hold|ELOCKED/) } - + } finally { // Reset listeners to maintain test integrity process.removeAllListeners('uncaughtException') process.addListener('uncaughtException', mochaExceptionHandler) - - done() - }) + } }) }) } diff --git a/test/node.js b/test/node.js index 3f62f14b..85394714 100644 --- a/test/node.js +++ b/test/node.js @@ -5,10 +5,15 @@ const ncp = require('ncp').ncp const rimraf = require('rimraf') const fs = require('fs') const path = require('path') -const series = require('async/series') +const promisify = require('util').promisify + const chai = require('chai') chai.use(require('dirty-chai')) +const asyncRimraf = promisify(rimraf) +const asyncNcp = promisify(ncp) +const fsstat = promisify(fs.stat) + const IPFSRepo = require('../src') describe('IPFS Repo Tests onNode.js', () => { @@ -16,30 +21,24 @@ describe('IPFS Repo Tests onNode.js', () => { const customLock = { lockName: 'test.lock', - lock: (dir, callback) => { - customLock.locked(dir, (err, isLocked) => { - if (err || isLocked) { - return callback(new Error('already locked')) - } - - const lockPath = path.join(dir, customLock.lockName) - fs.writeFileSync(lockPath, '') - - callback(null, { - close: (cb) => { - rimraf(lockPath, cb) - } - }) - }) + lock: async (dir) => { + const isLocked = await customLock.locked(dir) + if (isLocked) { + throw new Error('already locked') + } + const lockPath = path.join(dir, customLock.lockName) + fs.writeFileSync(lockPath, '') + return { + close: () => asyncRimraf(lockPath) + } }, - locked: (dir, callback) => { - fs.stat(path.join(dir, customLock.lockName), (err, stats) => { - if (err) { - callback(null, false) - } else { - callback(null, true) - } - }) + locked: async (dir) => { + try { + await fsstat(path.join(dir, customLock.lockName)) + return true + } catch (err) { + return false + } } } @@ -78,24 +77,18 @@ describe('IPFS Repo Tests onNode.js', () => { const repo = new IPFSRepo(repoPath, r.opts) - before((done) => { - series([ - (cb) => { - if (r.init) { - repo.init({}, cb) - } else { - ncp(testRepoPath, repoPath, cb) - } - }, - (cb) => repo.open(cb) - ], done) + before(async () => { + if (r.init) { + await repo.init({}) + } else { + await asyncNcp(testRepoPath, repoPath) + } + await repo.open() }) - after((done) => { - series([ - (cb) => repo.close(cb), - (cb) => rimraf(repoPath, cb) - ], done) + after(async () => { + await repo.close() + await asyncRimraf(repoPath) }) require('./repo-test')(repo) diff --git a/test/options-test.js b/test/options-test.js index b7afb606..f054ea87 100644 --- a/test/options-test.js +++ b/test/options-test.js @@ -31,8 +31,8 @@ describe('custom options tests', () => { it('allows for a custom lock', () => { const lock = { - lock: (path, callback) => { }, - locked: (path, callback) => { } + lock: async (path) => { }, + locked: async (path) => { } } const repo = new Repo(repoPath, { @@ -42,21 +42,23 @@ describe('custom options tests', () => { expect(repo._getLocker()).to.deep.equal(lock) }) - it('ensures a custom lock has a .close method', (done) => { + it('ensures a custom lock has a .close method', async () => { const lock = { - lock: (path, callback) => { - callback(null, {}) + lock: async (path) => { + return {} } } const repo = new Repo(repoPath, { lock }) - - expect( - () => repo._openLock(repo.path) - ).to.throw('Locks must have a close method') - done() + let error + try { + await repo._openLock(repo.path) + } catch (err) { + error = err + } + expect(error.message).to.equal('Locks must have a close method') }) }) diff --git a/test/repo-test.js b/test/repo-test.js index 30eea4f0..2769b0af 100644 --- a/test/repo-test.js +++ b/test/repo-test.js @@ -4,17 +4,12 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const series = require('async/series') -const waterfall = require('async/waterfall') module.exports = (repo) => { describe('IPFS Repo Tests', () => { - it('check if Repo exists', (done) => { - repo.exists((err, exists) => { - expect(err).to.not.exist() - expect(exists).to.equal(true) - done() - }) + it('check if Repo exists', async () => { + const exists = await repo.exists() + expect(exists).to.equal(true) }) it('exposes the path', () => { @@ -22,109 +17,62 @@ module.exports = (repo) => { }) describe('config', () => { - it('get config', (done) => { - repo.config.get((err, config) => { - expect(err).to.not.exist() - expect(config).to.be.a('object') - done() - }) - }) - - it('set config', (done) => { - series([ - (cb) => repo.config.set({ a: 'b' }, cb), - (cb) => repo.config.get((err, config) => { - if (err) return cb(err) - expect(config).to.deep.equal({ a: 'b' }) - cb() - }) - ], done) + it('get config', async () => { + const config = await repo.config.get() + expect(config).to.be.a('object') }) - it('get config key', (done) => { - repo.config.get('a', (err, value) => { - expect(err).to.not.exist() - expect(value).to.equal('b') - done() - }) + it('set config', async () => { + await repo.config.set({ a: 'b' }) + const config = await repo.config.get() + expect(config).to.deep.equal({ a: 'b' }) }) - it('get config key should fail with non string key', (done) => { - repo.config.get(1111, (err, value) => { - expect(err).to.exist() - console.log(value) - - done() - }) + it('get config key', async () => { + const value = await repo.config.get('a') + expect(value).to.equal('b') }) - it('set config key', (done) => { - series([ - (cb) => repo.config.set('c.x', 'd', cb), - (cb) => repo.config.get((err, config) => { - if (err) return cb(err) - expect(config).to.deep.equal({ a: 'b', c: { x: 'd' } }) - cb() - }) - ], done) + it('set config key', async () => { + await repo.config.set('c.x', 'd') + const config = await repo.config.get() + expect(config).to.deep.equal({ a: 'b', c: { x: 'd' } }) }) }) describe('spec', () => { - it('get spec', (done) => { - repo.spec.get((err) => { - expect(err).to.not.exist() - done() - }) + it('get spec', async () => { + await repo.spec.get() }) - it('set spec', (done) => { - series([ - (cb) => repo.spec.set({ a: 'b' }, cb), - (cb) => repo.spec.get((err, spec) => { - if (err) return cb(err) - expect(spec).to.deep.equal({ a: 'b' }) - cb() - }) - ], done) + it('set spec', async () => { + await repo.spec.set({ a: 'b' }) + const spec = await repo.spec.get() + expect(spec).to.deep.equal({ a: 'b' }) }) }) describe('version', () => { - it('get version', (done) => { - repo.version.get((err, version) => { - expect(err).to.not.exist() - expect(version).to.equal(7) - done() - }) + it('get version', async () => { + const version = await repo.version.get() + expect(version).to.equal(7) }) - it('set version', (done) => { - waterfall([ - (cb) => repo.version.set(9000, cb), - (cb) => repo.version.get(cb), - (version, cb) => { - expect(version).to.equal(9000) - cb() - }, - (cb) => repo.version.set(7, cb) - ], done) + it('set version', async () => { + await repo.version.set(9000) + await repo.version.get() + await repo.version.set(7) }) }) describe('lifecycle', () => { - it('close and open', (done) => { - waterfall([ - (cb) => repo.close(cb), - (cb) => repo.open(cb), - (cb) => repo.close(cb), - (cb) => repo.open(cb), - (cb) => repo.version.get(cb), - (version, cb) => { - expect(version).to.exist() - cb() - } - ], done) + it('close and open', async () => { + await repo.close() + await repo.open() + await repo.close() + await repo.open() + const version = await repo.version.get() + expect(version).to.exist() }) }) }) diff --git a/test/stat-test.js b/test/stat-test.js index b263c368..2db0d8c3 100644 --- a/test/stat-test.js +++ b/test/stat-test.js @@ -7,40 +7,34 @@ const expect = chai.expect module.exports = (repo) => { describe('stat', () => { - it('get stats', (done) => { - repo.stat((err, stats) => { - expect(err).to.not.exist() - expect(stats).to.exist() - expect(stats).to.have.property('numObjects') - expect(stats).to.have.property('version') - expect(stats).to.have.property('repoPath') - expect(stats).to.have.property('repoSize') - expect(stats).to.have.property('storageMax') + it('get stats', async () => { + const stats = await repo.stat() + expect(stats).to.exist() + expect(stats).to.have.property('numObjects') + expect(stats).to.have.property('version') + expect(stats).to.have.property('repoPath') + expect(stats).to.have.property('repoSize') + expect(stats).to.have.property('storageMax') - expect(stats.numObjects > '0').to.eql(true) - expect(stats.version > '0').to.eql(true) - expect(stats.repoSize > '0').to.eql(true) - expect(stats.storageMax > '0').to.eql(true) - done() - }) + expect(stats.numObjects > '0').to.eql(true) + expect(stats.version > '0').to.eql(true) + expect(stats.repoSize > '0').to.eql(true) + expect(stats.storageMax > '0').to.eql(true) }) - it('get human stats', (done) => { - repo.stat({ human: true }, (err, stats) => { - expect(err).to.not.exist() - expect(stats).to.exist() - expect(stats).to.have.property('numObjects') - expect(stats).to.have.property('version') - expect(stats).to.have.property('repoPath') - expect(stats).to.have.property('repoSize') - expect(stats).to.have.property('storageMax') + it('get human stats', async () => { + const stats = await repo.stat({ human: true }) + expect(stats).to.exist() + expect(stats).to.have.property('numObjects') + expect(stats).to.have.property('version') + expect(stats).to.have.property('repoPath') + expect(stats).to.have.property('repoSize') + expect(stats).to.have.property('storageMax') - expect(stats.numObjects > '0').to.eql(true) - expect(stats.version > '0').to.eql(true) - expect(stats.repoSize > '0').to.eql(true) - expect(stats.storageMax > '0').to.eql(true) - done() - }) + expect(stats.numObjects > '0').to.eql(true) + expect(stats.version > '0').to.eql(true) + expect(stats.repoSize > '0').to.eql(true) + expect(stats.storageMax > '0').to.eql(true) }) }) } From 0e5a8993a6196ff16621018168f82bf3d5ce430a Mon Sep 17 00:00:00 2001 From: Zane Starr Date: Tue, 26 Feb 2019 10:02:07 -0800 Subject: [PATCH 2/8] Explicitly label async and fix putMany --- src/blockstore.js | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/src/blockstore.js b/src/blockstore.js index d4fd71e1..689b2332 100644 --- a/src/blockstore.js +++ b/src/blockstore.js @@ -85,16 +85,15 @@ function createBaseStore (store) { * @param {Block} block * @returns {Promise} */ - put (block) { + async put (block) { if (!Block.isBlock(block)) { throw new Error('invalid block') } const k = cidToDsKey(block.cid) - return store.has(k).then((exists) => { - if (exists) { return } - return store.put(k, block.data) - }) + const exists = await store.has(k) + if (exists) return + return store.put(k, block.data) }, /** @@ -109,8 +108,12 @@ function createBaseStore (store) { block: b })) - const batch = await store.batch() - const newKeys = await Promise.all(keys.filter((k) => { store.has(k.key) })) + const batch = store.batch() + const newKeys = (await Promise.all(keys.map(async k => { + const exists = await store.has(k.key) + return exists ? null : k + }))).filter(Boolean) + newKeys.forEach((k) => { batch.put(k.key, k.block.data) }) @@ -122,18 +125,16 @@ function createBaseStore (store) { * @param {CID} cid * @returns {Promise} */ - has (cid) { + async has (cid) { if (!CID.isCID(cid)) { throw new Error('Not a valid cid') } - return store.has(cidToDsKey(cid)) - .then((exists) => { - if (exists) return exists - const otherCid = cidToOtherVersion(cid) - if (!otherCid) return false - return store.has(cidToDsKey(otherCid)) - }) + const exists = await store.has(cidToDsKey(cid)) + if (exists) return exists + const otherCid = cidToOtherVersion(cid) + if (!otherCid) return false + return store.has(cidToDsKey(otherCid)) }, /** * Delete a block from the store @@ -141,7 +142,7 @@ function createBaseStore (store) { * @param {CID} cid * @returns {Promise} */ - delete (cid) { + async delete (cid) { if (!CID.isCID(cid)) { throw new Error('Not a valid cid') } @@ -152,7 +153,7 @@ function createBaseStore (store) { * * @returns {Promise} */ - close () { + async close () { return store.close() } } From 1c9767939f195ddf0342456ef5a87c045978c6ba Mon Sep 17 00:00:00 2001 From: Zane Starr Date: Tue, 26 Feb 2019 10:15:59 -0800 Subject: [PATCH 3/8] Fix await on init --- src/index.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/index.js b/src/index.js index 717814c1..9de33cfe 100644 --- a/src/index.js +++ b/src/index.js @@ -59,12 +59,12 @@ class IpfsRepo { * @param {Object} config - config to write into `config`. * @returns {Promise} */ - init (config) { + async init (config) { log('initializing at: %s', this.path) - return this.root.open() - .then(() => this.config.set(buildConfig(config))) - .then(() => this.spec.set(buildDatastoreSpec(config))) - .then(() => this.version.set(repoVersion)) + await this.root.open() + await this.config.set(buildConfig(config)) + await this.spec.set(buildDatastoreSpec(config)) + await this.version.set(repoVersion) } /** From 3ae87feae7ef8c60a1bdb9118d356467753d9ae1 Mon Sep 17 00:00:00 2001 From: Zane Starr Date: Tue, 26 Feb 2019 10:32:46 -0800 Subject: [PATCH 4/8] Refactor repo initialization to return boolean --- src/index.js | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/src/index.js b/src/index.js index 9de33cfe..27e6b020 100644 --- a/src/index.js +++ b/src/index.js @@ -82,7 +82,14 @@ class IpfsRepo { // check if the repo is already initialized try { await this.root.open() - await this._isInitialized() + const initialized = await this._isInitialized() + if (!initialized) { + throw Object.assign(new Error('repo is not initialized yet'), + { + code: ERRORS.ERR_REPO_NOT_INITIALIZED, + path: this.path + }) + } this.lockfile = await this._openLock(this.path) log('aquired repo.lock') log('creating datastore') @@ -150,28 +157,16 @@ class IpfsRepo { /** * Check if the repo is already initialized. * @private - * @returns {Promise} + * @returns {Promise} */ async _isInitialized () { log('init check') - let res - let config, spec, version + let config, spec try { - [config, spec, version] = await Promise.all([this.config.exists(), this.spec.exists(), this.version.check(repoVersion)]) - res = { - config: config, - spec: spec, - version: version - } + [config, spec] = await Promise.all([this.config.exists(), this.spec.exists(), this.version.check(repoVersion)]) + return config && spec } catch (err) { - if (err && !res.config) { - throw Object.assign(new Error('repo is not initialized yet'), - { - code: ERRORS.ERR_REPO_NOT_INITIALIZED, - path: this.path - }) - } - throw err + return false } } From 90c9c74db97bd8ea5dc780708e96a122579d3459 Mon Sep 17 00:00:00 2001 From: Zane Starr Date: Tue, 26 Feb 2019 10:53:34 -0800 Subject: [PATCH 5/8] Label async functions --- src/api-addr.js | 4 ++-- src/config.js | 4 ++-- src/index.js | 2 +- src/lock-memory.js | 2 +- src/lock.js | 2 +- src/spec.js | 10 +++++----- src/version.js | 12 ++++++------ 7 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/api-addr.js b/src/api-addr.js index dc863d60..ed634ea9 100644 --- a/src/api-addr.js +++ b/src/api-addr.js @@ -21,7 +21,7 @@ module.exports = (store) => { * @param {Object} value - the api address to be written * @returns {Promise} */ - set (value) { + async set (value) { return store.put(apiFile, Buffer.from(value.toString())) }, /** @@ -29,7 +29,7 @@ module.exports = (store) => { * * @returns {Promise} */ - delete () { + async delete () { return store.delete(apiFile) } } diff --git a/src/config.js b/src/config.js index ce9541a2..a6ad51f8 100644 --- a/src/config.js +++ b/src/config.js @@ -20,7 +20,7 @@ module.exports = (store) => { * @param {String} key - the config key to get * @returns {Promise} */ - get (key) { + async get (key) { if (!key) { key = undefined } @@ -61,7 +61,7 @@ module.exports = (store) => { * * @returns {Promise} */ - exists () { + async exists () { return store.has(configKey) } } diff --git a/src/index.js b/src/index.js index 27e6b020..e4c81ed1 100644 --- a/src/index.js +++ b/src/index.js @@ -148,7 +148,7 @@ class IpfsRepo { * * @returns {Promise} */ - _closeLock () { + async _closeLock () { if (this.lockfile) { return this.lockfile.close() } diff --git a/src/lock-memory.js b/src/lock-memory.js index 3e13aa26..2f4a059a 100644 --- a/src/lock-memory.js +++ b/src/lock-memory.js @@ -14,7 +14,7 @@ const LOCKS = {} * @param {string} dir * @returns {Promise} */ -exports.lock = (dir) => { +exports.lock = async (dir) => { const file = dir + '/' + lockFile log('locking %s', file) LOCKS[file] = true diff --git a/src/lock.js b/src/lock.js index 0e7a26de..c3a7e3dd 100644 --- a/src/lock.js +++ b/src/lock.js @@ -22,7 +22,7 @@ const STALE_TIME = 20000 * Lock the repo in the given dir. * * @param {string} dir - * @returns {Object} + * @returns {Promise} */ exports.lock = async (dir) => { const file = path.join(dir, lockFile) diff --git a/src/spec.js b/src/spec.js index 26cf880e..6bf6522a 100644 --- a/src/spec.js +++ b/src/spec.js @@ -12,7 +12,7 @@ module.exports = (store) => { * * @returns {Promise} */ - exists () { + async exists () { return store.has(specKey) }, /** @@ -20,9 +20,9 @@ module.exports = (store) => { * * @returns {Promise} */ - get () { - return store.get() - .then(buf => JSON.parse(buf.toString())) + async get () { + const buf = await store.get() + return JSON.parse(buf.toString()) }, /** * Set the datastore spec of the repo, writing it to the underlying store. @@ -30,7 +30,7 @@ module.exports = (store) => { * @param {number} spec * @returns {Promise} */ - set (spec) { + async set (spec) { return store.put(specKey, Buffer.from(JSON.stringify(sortKeys(spec, { deep: true })))) } } diff --git a/src/version.js b/src/version.js index 1385bdea..448982cd 100644 --- a/src/version.js +++ b/src/version.js @@ -13,7 +13,7 @@ module.exports = (store) => { * * @returns {Promise} */ - exists () { + async exists () { return store.has(versionKey) }, /** @@ -21,17 +21,17 @@ module.exports = (store) => { * * @returns {Promise} */ - get () { - return this.get(versionKey) - .then(buf => parseInt(buf.toString().trim(), 10)) + async get () { + const buf = await this.get(versionKey) + return parseInt(buf.toString().trim(), 10) }, /** * Set the version of the repo, writing it to the underlying store. * * @param {number} version - * @returns {void} + * @returns {Promise} */ - set (version) { + async set (version) { return store.put(versionKey, Buffer.from(String(version))) }, /** From 16ce0609f92ac0c0515eb1ceb98327e9ab4d5d4b Mon Sep 17 00:00:00 2001 From: Zane Starr Date: Tue, 26 Feb 2019 11:11:58 -0800 Subject: [PATCH 6/8] Add async await to browser test --- test/browser.js | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/test/browser.js b/test/browser.js index f5490093..c5f318ef 100644 --- a/test/browser.js +++ b/test/browser.js @@ -2,23 +2,19 @@ 'use strict' -const series = require('async/series') - const IPFSRepo = require('../src') describe('IPFS Repo Tests on the Browser', () => { require('./options-test') const repo = new IPFSRepo('myrepo') - before((done) => { - series([ - (cb) => repo.init({}, cb), - (cb) => repo.open(cb) - ], done) + before(async () => { + await repo.init({}) + await repo.open() }) - after((done) => { - repo.close(done) + after(async () => { + await repo.close() }) require('./repo-test')(repo) From 85c92149f3079017f88ff76f6d80cdccb5c4111a Mon Sep 17 00:00:00 2001 From: dirkmc Date: Sun, 31 Mar 2019 00:27:46 -0400 Subject: [PATCH 7/8] fix: several bug fixes --- README.md | 101 ++++++++++++++++----------------------- example.js | 13 +++-- package.json | 11 +++-- src/api-addr.js | 2 +- src/blockstore.js | 2 +- src/config.js | 42 +++++++++-------- src/errors/index.js | 2 + src/index.js | 102 +++++++++++++++++++++++++--------------- src/spec.js | 6 +-- src/version.js | 2 +- test/blockstore-test.js | 75 ++++++++++++++--------------- test/datastore-test.js | 6 +-- test/interop-test.js | 9 ++-- test/repo-test.js | 24 ++++++++++ 14 files changed, 221 insertions(+), 176 deletions(-) diff --git a/README.md b/README.md index 052ab363..b003c324 100644 --- a/README.md +++ b/README.md @@ -107,19 +107,9 @@ Example: const Repo = require('ipfs-repo') const repo = new Repo('/tmp/ipfs-repo') -repo.init({ cool: 'config' }, (err) => { - if (err) { - throw err - } - - repo.open((err) => { - if (err) { - throw err - } - - console.log('repo is ready') - }) -}) +await repo.init({ cool: 'config' }) +await repo.open() +console.log('repo is ready') ``` This now has created the following structure, either on disk or as an in memory representation: @@ -157,63 +147,61 @@ Arguments: const repo = new Repo('path/to/repo') ``` -#### `repo.init (callback)` +#### `Promise repo.init ()` Creates the necessary folder structure inside the repo. -#### `repo.open (callback)` +#### `Promise repo.open ()` [Locks](https://en.wikipedia.org/wiki/Record_locking) the repo to prevent conflicts arising from simultaneous access. -#### `repo.close (callback)` +#### `Promise repo.close ()` Unlocks the repo. -#### `repo.exists (callback)` +#### `Promise repo.exists ()` -Tells whether this repo exists or not. Calls back with `(err, bool)`. +Tells whether this repo exists or not. Returns a boolean. ### Repos Root repo: -#### `repo.put (key, value:Buffer, callback)` +#### `Promise repo.put (key, value:Buffer)` Put a value at the root of the repo. * `key` can be a buffer, a string or a [Key](https://github.com/ipfs/interface-datastore#keys). -#### `repo.get (key, callback)` +#### `Promise repo.get (key)` Get a value at the root of the repo. * `key` can be a buffer, a string or a [Key](https://github.com/ipfs/interface-datastore#keys). -* `callback` is a callback function `function (err, result:Buffer)` [Blocks](https://github.com/ipfs/js-ipfs-block#readme): -#### `repo.blocks.put (block:Block, callback)` +#### `Promise repo.blocks.put (block:Block)` * `block` should be of type [Block](https://github.com/ipfs/js-ipfs-block#readme). -#### `repo.blocks.putMany (blocks, callback)` +#### `Promise repo.blocks.putMany (blocks)` Put many blocks. * `block` should be an array of type [Block](https://github.com/ipfs/js-ipfs-block#readme). -#### `repo.blocks.get (cid, callback)` +#### `Promise repo.blocks.get (cid)` Get block. * `cid` is the content id of [type CID](https://github.com/ipld/js-cid#readme). -* `callback` is a callback function `function (err, result:Buffer)` Datastore: #### `repo.datastore` -This is contains a full implementation of [the `interface-datastore` API](https://github.com/ipfs/interface-datastore#api). +This contains a full implementation of [the `interface-datastore` API](https://github.com/ipfs/interface-datastore#api). ### Utils @@ -222,77 +210,70 @@ This is contains a full implementation of [the `interface-datastore` API](https: Instead of using `repo.set('config')` this exposes an API that allows you to set and get a decoded config object, as well as, in a safe manner, change any of the config values individually. -##### `repo.config.set(key:string, value, callback)` +##### `Promise repo.config.set(key:string, value)` Set a config value. `value` can be any object that is serializable to JSON. * `key` is a string specifying the object path. Example: ```js -repo.config.set('a.b.c', 'c value', (err) => { - if (err) { throw err } - repo.config.get((err, config) => { - if (err) { throw err } - assert.equal(config.a.b.c, 'c value') - }) -}) +await repo.config.set('a.b.c', 'c value') +const config = await repo.config.get() +assert.equal(config.a.b.c, 'c value') ``` -##### `repo.config.get(value, callback)` +##### `Promise repo.config.set(value)` Set the whole config value. `value` can be any object that is serializable to JSON. -##### `repo.config.get(key:string, callback)` +##### `Promise repo.config.get(key:string)` -Get a config value. `callback` is a function with the signature: `function (err, value)`, wehre the ` -value` is of the same type that was set before. +Get a config value. Returns the same type that was set before. * `key` is a string specifying the object path. Example: ```js -repo.config.get('a.b.c', (err, value) => { - if (err) { throw err } - console.log('config.a.b.c = ', value) -}) +const value = await repo.config.get('a.b.c') +console.log('config.a.b.c = ', value) ``` -##### `repo.config.get(callback)` +##### `Promise repo.config.get()` -Get the entire config value. `callback` is a function with the signature: `function (err, configValue:Object)`. +Get the entire config value. -#### `repo.config.exists(callback)` +#### `Promise repo.config.exists()` -Whether the config sub-repo exists. Calls back with `(err, bool)`. +Whether the config sub-repo exists. #### `repo.version` -##### `repo.version.get (callback)` +##### `Promise repo.version.get ()` -Gets the repo version. +Gets the repo version (an integer). -##### `repo.version.set (version:number, callback)` +##### `Promise repo.version.set (version:number)` Sets the repo version #### `repo.apiAddr` -#### `repo.apiAddr.get (callback)` +#### `Promise repo.apiAddr.get ()` Gets the API address. -#### `repo.apiAddr.set (value, callback)` +#### `Promise repo.apiAddr.set (value)` Sets the API address. * `value` should be a [Multiaddr](https://github.com/multiformats/js-multiaddr) or a String representing a valid one. -### `repo.stat ([options], callback)` +### `Promise repo.stat ([options])` Gets the repo status. `options` is an object which might contain the key `human`, which is a boolean indicating whether or not the `repoSize` should be displayed in MiB or not. -`callback` is a function with the signature `function (err, stats)`, where `stats` is an Object with the following keys: +Returns an Object with the following keys: - `numObjects` - `repoPath` @@ -311,27 +292,27 @@ const memoryLock = require('ipfs-repo/src/lock-memory') // Default in browser You can also provide your own custom Lock. It must be an object with the following interface: -#### `lock.lock (dir, callback)` +#### `Promise lock.lock (dir)` -Sets the lock if one does not already exist. If a lock already exists, `callback` should be called with an error. +Sets the lock if one does not already exist. If a lock already exists, should throw an error. `dir` is a string to the directory the lock should be created at. The repo typically creates the lock at its root. -`callback` is a function with the signature `function (err, closer)`, where `closer` has a `close` method for removing the lock. +Returns `closer`, where `closer` has a `close` method for removing the lock. -##### `closer.close (callback)` +##### `Promise closer.close ()` Closes the lock created by `lock.open` -`callback` is a function with the signature `function (err)`. If no error was returned, the lock was successfully removed. +If no error was thrown, the lock was successfully removed. -#### `lock.locked (dir, callback)` +#### `Promise lock.locked (dir)` Checks the existence of the lock. `dir` is a string to the directory to check for the lock. The repo typically checks for the lock at its root. -`callback` is a function with the signature `function (err, boolean)`, where `boolean` indicates the existence of the lock. +Returns a `boolean` indicating the existence of the lock. ## Notes diff --git a/example.js b/example.js index 9f26bdf8..5768b1fa 100644 --- a/example.js +++ b/example.js @@ -1,8 +1,11 @@ 'use strict' -const Repo = require('ipfs-repo') -const repo = new Repo('/Users/awesome/.jsipfs') +const Repo = require('ipfs-repo'); -repo.init({ my: 'config' }) - .then(repo.open) - .then(() => console.log('repo is ready')) +(async () => { + const repo = new Repo('/Users/awesome/.jsipfs') + + await repo.init({ my: 'config' }) + await repo.open() + console.log('repo is ready') +})() diff --git a/package.json b/package.json index e974b1b7..2e851db7 100644 --- a/package.json +++ b/package.json @@ -48,7 +48,7 @@ "lodash": "^4.17.11", "memdown": "^3.0.0", "multihashes": "~0.4.14", - "multihashing-async": "~0.5.2", + "multihashing-async": "multiformats/js-multihashing-async#feat/async-iterators", "ncp": "^2.0.0", "rimraf": "^2.6.3" }, @@ -57,17 +57,20 @@ "bignumber.js": "^8.1.1", "buffer": "^5.2.1", "cids": "~0.5.8", - "datastore-core": "~0.6.0", - "datastore-fs": "~0.7.0", + "datastore-core": "git://github.com/zcstarr/js-datastore-core.git", + "datastore-fs": "git://github.com/zcstarr/js-datastore-fs.git", "datastore-level": "git://github.com/ipfs/js-datastore-level.git#refactor/async-iterators", "debug": "^4.1.0", + "err-code": "^1.1.2", "interface-datastore": "git://github.com/ipfs/interface-datastore.git#refactor/async-iterators", "ipfs-block": "~0.8.0", "just-safe-set": "^2.1.0", "multiaddr": "^6.0.6", "proper-lockfile": "^4.0.0", "pull-stream": "^3.6.9", - "sort-keys": "^2.0.0" + "p-queue": "^3.1.0", + "sort-keys": "^2.0.0", + "streaming-iterables": "^4.0.2" }, "license": "MIT", "contributors": [ diff --git a/src/api-addr.js b/src/api-addr.js index ed634ea9..fec0cd96 100644 --- a/src/api-addr.js +++ b/src/api-addr.js @@ -9,7 +9,7 @@ module.exports = (store) => { /** * Get the current configuration from the repo. * - * @returns {Promise} + * @returns {Promise} */ async get () { const value = await store.get(apiFile) diff --git a/src/blockstore.js b/src/blockstore.js index 689b2332..690f0d64 100644 --- a/src/blockstore.js +++ b/src/blockstore.js @@ -153,7 +153,7 @@ function createBaseStore (store) { * * @returns {Promise} */ - async close () { + close () { return store.close() } } diff --git a/src/config.js b/src/config.js index a6ad51f8..85c7aa84 100644 --- a/src/config.js +++ b/src/config.js @@ -1,7 +1,7 @@ 'use strict' const Key = require('interface-datastore').Key -const queue = require('async/queue') +const Queue = require('p-queue') const _get = require('lodash.get') const _set = require('lodash.set') const _has = require('lodash.has') @@ -10,8 +10,7 @@ const Buffer = require('safe-buffer').Buffer const configKey = new Key('config') module.exports = (store) => { - const setQueue = queue(_doSet, 1) - setQueue.error = (err) => { throw err } + const setQueue = new Queue({ concurrency: 1 }) const configStore = { /** @@ -24,16 +23,17 @@ module.exports = (store) => { if (!key) { key = undefined } - return store.get(configKey) - .then((encodedValue) => { - const config = JSON.parse(encodedValue.toString()) - if (key !== undefined && !_has(config, key)) { - throw new Error(`Key ${key} does not exist in config`) - } - const value = key !== undefined ? _get(config, key) : config - return value - }) + + const encodedValue = await store.get(configKey) + const config = JSON.parse(encodedValue.toString()) + if (key !== undefined && !_has(config, key)) { + throw new Error(`Key ${key} does not exist in config`) + } + + const value = key !== undefined ? _get(config, key) : config + return value }, + /** * Set the current configuration for this repo. * @@ -42,18 +42,21 @@ module.exports = (store) => { * @returns {void} */ set (key, value) { - if (!key || typeof key !== 'string') { - throw new Error('Invalid key type') + if (arguments.length === 1) { + value = key + key = undefined + } else if (!key || typeof key !== 'string') { + throw new Error('Invalid key type: ' + typeof key) } if (value === undefined || Buffer.isBuffer(value)) { - throw new Error('Invalid value type') + throw new Error('Invalid value type: ' + typeof value) } - setQueue.push({ + return setQueue.add(() => _doSet({ key: key, value: value - }) + })) }, /** @@ -61,7 +64,7 @@ module.exports = (store) => { * * @returns {Promise} */ - async exists () { + exists () { return store.has(configKey) } } @@ -74,8 +77,9 @@ module.exports = (store) => { if (key) { const config = await configStore.get() _set(config, key, value) - await _saveAll(config) + return _saveAll(config) } + return _saveAll(value) } function _saveAll (config) { diff --git a/src/errors/index.js b/src/errors/index.js index 3e0f3647..78c51a40 100644 --- a/src/errors/index.js +++ b/src/errors/index.js @@ -1,3 +1,5 @@ 'use strict' exports.ERR_REPO_NOT_INITIALIZED = 'ERR_REPO_NOT_INITIALIZED' +exports.ERR_REPO_ALREADY_OPEN = 'ERR_REPO_ALREADY_OPEN' +exports.ERR_REPO_ALREADY_CLOSED = 'ERR_REPO_ALREADY_CLOSED' diff --git a/src/index.js b/src/index.js index e4c81ed1..bd3adb64 100644 --- a/src/index.js +++ b/src/index.js @@ -5,6 +5,8 @@ const assert = require('assert') const path = require('path') const debug = require('debug') const Big = require('bignumber.js') +const errcode = require('err-code') +const { collect } = require('streaming-iterables') const backends = require('./backends') const version = require('./version') @@ -61,37 +63,30 @@ class IpfsRepo { */ async init (config) { log('initializing at: %s', this.path) - await this.root.open() + await this._openRoot() await this.config.set(buildConfig(config)) await this.spec.set(buildDatastoreSpec(config)) await this.version.set(repoVersion) } /** - * Open the repo. If the repo is already open no action will be taken. - * If the repo is not initialized it will return an error. + * Open the repo. If the repo is already open an error will be thrown. + * If the repo is not initialized it will throw an error. * * @returns {Promise} */ async open () { if (!this.closed) { - throw new Error('repo is already open') + throw errcode('repo is already open', ERRORS.ERR_REPO_ALREADY_OPEN) } log('opening at: %s', this.path) // check if the repo is already initialized try { - await this.root.open() - const initialized = await this._isInitialized() - if (!initialized) { - throw Object.assign(new Error('repo is not initialized yet'), - { - code: ERRORS.ERR_REPO_NOT_INITIALIZED, - path: this.path - }) - } + await this._openRoot() + await this._checkInitialized() this.lockfile = await this._openLock(this.path) - log('aquired repo.lock') + log('acquired repo.lock') log('creating datastore') this.datastore = backends.create('datastore', path.join(this.path, 'datastore'), this.options) log('creating blocks') @@ -102,13 +97,14 @@ class IpfsRepo { this.closed = false log('all opened') } catch (err) { - if (err && this.lockfile) { - try { - this._closeLock() - } catch (err2) { - log('error removing lock', err2) - } + if (!this.lockfile) { + throw err + } + try { + await this._closeLock() this.lockfile = null + } catch (err2) { + log('error removing lock', err2) throw err } } @@ -130,6 +126,20 @@ class IpfsRepo { return this.options.lock } + /** + * Opens the root backend, catching and ignoring an 'Already open' error + * @returns {Promise} + */ + async _openRoot () { + try { + await this.root.open() + } catch (err) { + if (err.message !== 'Already open') { + throw err + } + } + } + /** * Creates a lock on the repo if a locker is specified. The lockfile object will * be returned in the callback if one has been created. @@ -157,16 +167,25 @@ class IpfsRepo { /** * Check if the repo is already initialized. * @private - * @returns {Promise} + * @returns {Promise} */ - async _isInitialized () { + async _checkInitialized () { log('init check') - let config, spec + let config try { - [config, spec] = await Promise.all([this.config.exists(), this.spec.exists(), this.version.check(repoVersion)]) - return config && spec + [config] = await Promise.all([ + this.config.exists(), + this.spec.exists(), + this.version.check(repoVersion) + ]) } catch (err) { - return false + if (!config) { + throw Object.assign( + errcode('repo is not initialized yet', ERRORS.ERR_REPO_NOT_INITIALIZED), { + path: this.path + }) + } + throw err } } @@ -177,10 +196,19 @@ class IpfsRepo { */ async close () { if (this.closed) { - throw new Error('repo is already closed') + throw errcode('repo is already closed', ERRORS.ERR_REPO_ALREADY_CLOSED) } log('closing at: %s', this.path) - await this.apiAddr.delete() + + try { + // Delete api, ignoring irrelevant errors + await this.apiAddr.delete() + } catch (err) { + if (err.code !== ERRORS.ERR_REPO_NOT_INITIALIZED && !err.message.startsWith('ENOENT')) { + throw err + } + } + await Promise.all([this.blocks, this.keys, this.datastore].map((store) => store.close())) log('unlocking') this.closed = true @@ -230,17 +258,17 @@ class IpfsRepo { } } - _storageMaxStat () { - return this.config.get('Datastore.StorageMax') - .then((max) => new Big(max)) - .catch(() => new Big(noLimit)) + async _storageMaxStat () { + try { + const max = await this.config.get('Datastore.StorageMax') + return new Big(max) + } catch (err) { + return new Big(noLimit) + } } async _blockStat () { - const list = [] - for await (const block of this.blocks.query({})) { - list.push(block) - } + const list = await collect(this.blocks.query({})) const count = new Big(list.length) let size = new Big(0) @@ -249,7 +277,7 @@ class IpfsRepo { .plus(block.value.byteLength) .plus(block.key._buf.byteLength) }) - return { count: count, size: size } + return { count, size } } } diff --git a/src/spec.js b/src/spec.js index 6bf6522a..d158878f 100644 --- a/src/spec.js +++ b/src/spec.js @@ -12,7 +12,7 @@ module.exports = (store) => { * * @returns {Promise} */ - async exists () { + exists () { return store.has(specKey) }, /** @@ -21,7 +21,7 @@ module.exports = (store) => { * @returns {Promise} */ async get () { - const buf = await store.get() + const buf = await store.get(specKey) return JSON.parse(buf.toString()) }, /** @@ -30,7 +30,7 @@ module.exports = (store) => { * @param {number} spec * @returns {Promise} */ - async set (spec) { + set (spec) { return store.put(specKey, Buffer.from(JSON.stringify(sortKeys(spec, { deep: true })))) } } diff --git a/src/version.js b/src/version.js index 448982cd..ca4e7be6 100644 --- a/src/version.js +++ b/src/version.js @@ -22,7 +22,7 @@ module.exports = (store) => { * @returns {Promise} */ async get () { - const buf = await this.get(versionKey) + const buf = await store.get(versionKey) return parseInt(buf.toString().trim(), 10) }, /** diff --git a/test/blockstore-test.js b/test/blockstore-test.js index b39c8122..6da9c287 100644 --- a/test/blockstore-test.js +++ b/test/blockstore-test.js @@ -38,7 +38,7 @@ module.exports = (repo) => { await repo.blocks.put(empty) }) - it('massive multiwrite', async () => { + it('massive multiwrite', async function () { this.timeout(15000) // add time for ci const hashes = await Promise.all(_.range(100).map((i) => multihashing(blockData[i], 'sha2-256'))) await Promise.all(_.range(100).map((i) => { @@ -47,7 +47,7 @@ module.exports = (repo) => { })) }) - it('.putMany', async () => { + it('.putMany', async function () { this.timeout(15000) // add time for ci const blocks = await Promise.all(_.range(50).map(async (i) => { const d = Buffer.from('many' + Math.random()) @@ -55,10 +55,10 @@ module.exports = (repo) => { return new Block(d, new CID(hash)) })) await repo.blocks.putMany(blocks) - blocks.each(async (block) => { + for (const block of blocks) { const block1 = await repo.blocks.get(block.cid) expect(block1).to.be.eql(block) - }) + } }) it('returns an error on invalid block', async () => { @@ -76,44 +76,45 @@ module.exports = (repo) => { const block = await repo.blocks.get(b.cid) expect(block).to.be.eql(b) }) - }) - it('massive read', async function () { - this.timeout(15000) // add time for ci - await Promise.all(_.range(20 * 100).map(async (i) => { - const j = i % blockData.length - const hash = await multihashing(blockData[j], 'sha2-256') - const block = await repo.blocks.get(new CID(hash)) - block.to.be.eql(blockData[j]) - })) - }) + it('massive read', async function () { + this.timeout(15000) // add time for ci + await Promise.all(_.range(20 * 100).map(async (i) => { + const j = i % blockData.length + const hash = await multihashing(blockData[j], 'sha2-256') + const block = await repo.blocks.get(new CID(hash)) + expect(block.data).to.be.eql(blockData[j]) + })) + }) - it('returns an error on invalid block', async () => { - try { - await repo.blocks.get('woot') - } catch (err) { - expect(err).to.exist() - } - assert.fail() - }) + it('returns an error on invalid block', async () => { + try { + await repo.blocks.get('woot') + } catch (err) { + expect(err).to.exist() + return + } + assert.fail() + }) - it('should get block stored under v0 CID with a v1 CID', async () => { - const data = Buffer.from(`TEST${Date.now()}`) - const hash = await multihashing(data, 'sha2-256') - const cid = new CID(hash) - await repo.blocks.put(new Block(data, cid)) - const block = await repo.blocks.get(cid.toV1()) - expect(block.data).to.eql(data) - }) + it('should get block stored under v0 CID with a v1 CID', async () => { + const data = Buffer.from(`TEST${Date.now()}`) + const hash = await multihashing(data, 'sha2-256') + const cid = new CID(hash) + await repo.blocks.put(new Block(data, cid)) + const block = await repo.blocks.get(cid.toV1()) + expect(block.data).to.eql(data) + }) - it('should get block stored under v1 CID with a v0 CID', async () => { - const data = Buffer.from(`TEST${Date.now()}`) + it('should get block stored under v1 CID with a v0 CID', async () => { + const data = Buffer.from(`TEST${Date.now()}`) - const hash = await multihashing(data, 'sha2-256') - const cid = new CID(1, 'dag-pb', hash) - await repo.blocks.put(new Block(data, cid)) - const block = await repo.blocks.get(cid.toV0()) - expect(block.data).to.empty(data) + const hash = await multihashing(data, 'sha2-256') + const cid = new CID(1, 'dag-pb', hash) + await repo.blocks.put(new Block(data, cid)) + const block = await repo.blocks.get(cid.toV0()) + expect(block.data).to.eql(data) + }) }) describe('.has', () => { diff --git a/test/datastore-test.js b/test/datastore-test.js index e85296c7..e77e3f39 100644 --- a/test/datastore-test.js +++ b/test/datastore-test.js @@ -39,10 +39,10 @@ module.exports = (repo) => { it('massive read', async function () { this.timeout(15000) // add time for ci - await Promise.all(_.range(20 * 100).map((i) => { + await Promise.all(_.range(20 * 100).map(async (i) => { const j = i % dataList.length - return repo.datastore.get(new Key('hello' + j)) - .then(val => expect(val).to.be.eql(dataList[j])) + const val = await repo.datastore.get(new Key('hello' + j)) + expect(val).to.be.eql(dataList[j]) })) }).timeout(10 * 1000) }) diff --git a/test/interop-test.js b/test/interop-test.js index 7a938b9d..393044fb 100644 --- a/test/interop-test.js +++ b/test/interop-test.js @@ -24,11 +24,10 @@ module.exports = (repo) => { 'QmUxpzJGJYTK5AzH36jV9ucM2WdF5KhjANb4FAhqnREzuC', 'QmQbb26h9dcU5iNPMNEzYZnZN9YLTXBtFwuHmmo6YU4Aig' ].map((hash) => new CID(mh.fromB58String(hash))) - const values = await Promise.all(cids.map(cid => repo.blocks.get(cid))) - values.forEach((value) => { - expect(value.length).to.equal(2) - expect(value.map(val => val.data.length)).to.eql([2659, 12783]) - }) + + const values = await Promise.all(cids.map((cid) => repo.blocks.get(cid))) + expect(values.length).to.equal(2) + expect(values.map((value) => value.data.length)).to.eql([2659, 12783]) }) it('reads pin set from the datastore', async () => { diff --git a/test/repo-test.js b/test/repo-test.js index 2769b0af..d423f560 100644 --- a/test/repo-test.js +++ b/test/repo-test.js @@ -5,6 +5,8 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect +const Error = require('../src/errors') + module.exports = (repo) => { describe('IPFS Repo Tests', () => { it('check if Repo exists', async () => { @@ -74,6 +76,28 @@ module.exports = (repo) => { const version = await repo.version.get() expect(version).to.exist() }) + + it('close twice throws error', async () => { + await repo.close() + try { + await repo.close() + } catch (err) { + expect(err.code).to.eql(Error.ERR_REPO_ALREADY_CLOSED) + return + } + expect.fail('Did not throw') + }) + + it('open twice throws error', async () => { + await repo.open() + try { + await repo.open() + } catch (err) { + expect(err.code).to.eql(Error.ERR_REPO_ALREADY_OPEN) + return + } + expect.fail('Did not throw') + }) }) }) } From 442b2cfec21f33a2a484c2aa1911a0f3dc355050 Mon Sep 17 00:00:00 2001 From: Zane Starr Date: Sat, 30 Mar 2019 21:31:39 -0700 Subject: [PATCH 8/8] fix: add async label to async functions --- src/blockstore.js | 2 +- src/spec.js | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/blockstore.js b/src/blockstore.js index 690f0d64..689b2332 100644 --- a/src/blockstore.js +++ b/src/blockstore.js @@ -153,7 +153,7 @@ function createBaseStore (store) { * * @returns {Promise} */ - close () { + async close () { return store.close() } } diff --git a/src/spec.js b/src/spec.js index d158878f..312b9f49 100644 --- a/src/spec.js +++ b/src/spec.js @@ -12,7 +12,7 @@ module.exports = (store) => { * * @returns {Promise} */ - exists () { + async exists () { return store.has(specKey) }, /** @@ -30,7 +30,7 @@ module.exports = (store) => { * @param {number} spec * @returns {Promise} */ - set (spec) { + async set (spec) { return store.put(specKey, Buffer.from(JSON.stringify(sortKeys(spec, { deep: true })))) } }