diff --git a/README.md b/README.md index 9d7af00..26ec2e7 100644 --- a/README.md +++ b/README.md @@ -17,11 +17,17 @@ ## Table of Contents -- [Install](#install) -- [Usage](#usage) -- [API](#api) -- [Contribute](#contribute) -- [License](#license) + - [Lead Maintainer](#lead-maintainer) + - [Table of Contents](#table-of-contents) + - [Install](#install) + - [Usage](#usage) + - [API](#api) + - [Setup](#setup) + - [Get](#get) + - [Put](#put) + - [Unsubscribe](#unsubscribe) + - [Contribute](#contribute) + - [License](#license) ### Install @@ -62,7 +68,7 @@ const dsPubsub = new DatastorePubsub(pubsub, datastore, peerId, validator) #### Get ```js -dsPubsub.get(key, callback) +const buf = await dsPubsub.get(key) ``` Try to subscribe a topic with Pubsub and receive the current local value if available. @@ -70,14 +76,13 @@ Try to subscribe a topic with Pubsub and receive the current local value if avai Arguments: - `key` (Buffer): a key representing a unique identifier of the object to subscribe. -- `callback` (function): operation result. -`callback` must follow `function (err, data) {}` signature, where `err` is an error if the operation was not successful. If no `err` is received, a `data` is received containing the most recent known record stored (`Buffer`). +Returns `Promise` containing the most recent known record stored. #### Put ```js -dsPubsub.put(key, val, callback) +await dsPubsub.put(key, val) ``` Publishes a value through pubsub. @@ -86,14 +91,13 @@ Arguments: - `key` (Buffer): a key representing a unique identifier of the object to publish. - `val` (Buffer): value to be propagated. -- `callback` (function): operation result. -`callback` must follow `function (err) {}` signature, where `err` is an error if the operation was not successful. +Returns `Promise` #### Unsubscribe ```js -dsPubsub.unsubscribe(key, callback) +await dsPubsub.unsubscribe(key) ``` Unsubscribe a previously subscribe value. @@ -102,6 +106,8 @@ Arguments: - `key` (Buffer): a key representing a unique identifier of the object to publish. +Returns `Promise` + ## Contribute Feel free to join in. All welcome. Open an [issue](https://github.com/ipfs/js-ipns/issues)! diff --git a/package.json b/package.json index bbb8a04..6ee50e1 100644 --- a/package.json +++ b/package.json @@ -32,22 +32,23 @@ }, "homepage": "https://github.com/ipfs/js-datastore-pubsub#readme", "dependencies": { - "assert": "^1.4.1", - "async": "^2.6.2", "debug": "^4.1.1", - "err-code": "^1.1.2", - "interface-datastore": "~0.6.0", + "err-code": "^2.0.0", + "interface-datastore": "~0.7.0", "multibase": "~0.6.0" }, "devDependencies": { - "aegir": "^18.1.1", + "aegir": "^20.0.0", "chai": "^4.2.0", + "delay": "^4.3.0", "detect-node": "^2.0.4", "dirty-chai": "^2.0.1", - "ipfs": "~0.34.4", - "ipfsd-ctl": "~0.42.0", - "libp2p-record": "~0.6.2", - "sinon": "^7.2.4" + "ipfs": "~0.37.0", + "ipfs-http-client": "^33.0.0", + "ipfsd-ctl": "^0.47.2", + "libp2p-record": "~0.7.0", + "promisify-es6": "^1.0.3", + "sinon": "^7.4.1" }, "contributors": [ "Vasco Santos ", diff --git a/src/index.js b/src/index.js index 6ab2bd6..258313d 100644 --- a/src/index.js +++ b/src/index.js @@ -4,7 +4,6 @@ const { Key } = require('interface-datastore') const { encodeBase32, keyToTopic, topicToKey } = require('./utils') const errcode = require('err-code') -const assert = require('assert') const debug = require('debug') const log = debug('datastore-pubsub:publisher') log.error = debug('datastore-pubsub:publisher:error') @@ -24,10 +23,21 @@ class DatastorePubsub { * @memberof DatastorePubsub */ constructor (pubsub, datastore, peerId, validator, subscriptionKeyFn) { - assert.strictEqual(typeof validator, 'object', 'missing validator') - assert.strictEqual(typeof validator.validate, 'function', 'missing validate function') - assert.strictEqual(typeof validator.select, 'function', 'missing select function') - subscriptionKeyFn && assert.strictEqual(typeof subscriptionKeyFn, 'function', 'invalid subscriptionKeyFn received') + if (!validator) { + throw errcode(new TypeError('missing validator'), 'ERR_INVALID_PARAMETERS') + } + + if (typeof validator.validate !== 'function') { + throw errcode(new TypeError('missing validate function'), 'ERR_INVALID_PARAMETERS') + } + + if (typeof validator.select !== 'function') { + throw errcode(new TypeError('missing select function'), 'ERR_INVALID_PARAMETERS') + } + + if (subscriptionKeyFn && typeof subscriptionKeyFn !== 'function') { + throw errcode(new TypeError('invalid subscriptionKeyFn received'), 'ERR_INVALID_PARAMETERS') + } this._pubsub = pubsub this._datastore = datastore @@ -43,22 +53,21 @@ class DatastorePubsub { * Publishes a value through pubsub. * @param {Buffer} key identifier of the value to be published. * @param {Buffer} val value to be propagated. - * @param {function(Error)} callback - * @returns {void} + * @returns {Promise} */ - put (key, val, callback) { + async put (key, val) { // eslint-disable-line require-await if (!Buffer.isBuffer(key)) { - const errMsg = `datastore key does not have a valid format` + const errMsg = 'datastore key does not have a valid format' log.error(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY')) + throw errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY') } if (!Buffer.isBuffer(val)) { - const errMsg = `received value is not a buffer` + const errMsg = 'received value is not a buffer' log.error(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_INVALID_VALUE_RECEIVED')) + throw errcode(new Error(errMsg), 'ERR_INVALID_VALUE_RECEIVED') } const stringifiedTopic = keyToTopic(key) @@ -66,48 +75,42 @@ class DatastorePubsub { log(`publish value for topic ${stringifiedTopic}`) // Publish record to pubsub - this._pubsub.publish(stringifiedTopic, val, callback) + return this._pubsub.publish(stringifiedTopic, val) } /** * Try to subscribe a topic with Pubsub and returns the local value if available. * @param {Buffer} key identifier of the value to be subscribed. - * @param {function(Error, Buffer)} callback - * @returns {void} + * @returns {Promise} */ - get (key, callback) { + async get (key) { if (!Buffer.isBuffer(key)) { - const errMsg = `datastore key does not have a valid format` + const errMsg = 'datastore key does not have a valid format' log.error(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY')) + throw errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY') } const stringifiedTopic = keyToTopic(key) + const subscriptions = await this._pubsub.ls() - this._pubsub.ls((err, res) => { - if (err) { - return callback(err) - } - - // If already subscribed, just try to get it - if (res && Array.isArray(res) && res.indexOf(stringifiedTopic) > -1) { - return this._getLocal(key, callback) - } + // If already subscribed, just try to get it + if (subscriptions && Array.isArray(subscriptions) && subscriptions.indexOf(stringifiedTopic) > -1) { + return this._getLocal(key) + } - // Subscribe - this._pubsub.subscribe(stringifiedTopic, this._onMessage, (err) => { - if (err) { - const errMsg = `cannot subscribe topic ${stringifiedTopic}` + // subscribe + try { + await this._pubsub.subscribe(stringifiedTopic, this._onMessage) + } catch (err) { + const errMsg = `cannot subscribe topic ${stringifiedTopic}` - log.error(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_SUBSCRIBING_TOPIC')) - } - log(`subscribed values for key ${stringifiedTopic}`) + log.error(errMsg) + throw errcode(new Error(errMsg), 'ERR_SUBSCRIBING_TOPIC') + } + log(`subscribed values for key ${stringifiedTopic}`) - this._getLocal(key, callback) - }) - }) + return this._getLocal(key) } /** @@ -118,41 +121,42 @@ class DatastorePubsub { unsubscribe (key) { const stringifiedTopic = keyToTopic(key) - this._pubsub.unsubscribe(stringifiedTopic, this._onMessage) + return this._pubsub.unsubscribe(stringifiedTopic, this._onMessage) } // Get record from local datastore - _getLocal (key, callback) { + async _getLocal (key) { // encode key - base32(/ipns/{cid}) const routingKey = new Key('/' + encodeBase32(key), false) + let dsVal - this._datastore.get(routingKey, (err, dsVal) => { - if (err) { - if (err.code !== 'ERR_NOT_FOUND') { - const errMsg = `unexpected error getting the ipns record for ${routingKey.toString()}` - - log.error(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_UNEXPECTED_ERROR_GETTING_RECORD')) - } - const errMsg = `local record requested was not found for ${routingKey.toString()}` + try { + dsVal = await this._datastore.get(routingKey) + } catch (err) { + if (err.code !== 'ERR_NOT_FOUND') { + const errMsg = `unexpected error getting the ipns record for ${routingKey.toString()}` log.error(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_NOT_FOUND')) + throw errcode(new Error(errMsg), 'ERR_UNEXPECTED_ERROR_GETTING_RECORD') } + const errMsg = `local record requested was not found for ${routingKey.toString()}` - if (!Buffer.isBuffer(dsVal)) { - const errMsg = `found record that we couldn't convert to a value` + log.error(errMsg) + throw errcode(new Error(errMsg), 'ERR_NOT_FOUND') + } - log.error(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_INVALID_RECORD_RECEIVED')) - } + if (!Buffer.isBuffer(dsVal)) { + const errMsg = 'found record that we couldn\'t convert to a value' - callback(null, dsVal) - }) + log.error(errMsg) + throw errcode(new Error(errMsg), 'ERR_INVALID_RECORD_RECEIVED') + } + + return dsVal } // handles pubsub subscription messages - _onMessage (msg) { + async _onMessage (msg) { const { data, from, topicIDs } = msg let key try { @@ -166,135 +170,145 @@ class DatastorePubsub { // Stop if the message is from the peer (it already stored it while publishing to pubsub) if (from === this._peerId.toB58String()) { - log(`message discarded as it is from the same peer`) + log('message discarded as it is from the same peer') return } if (this._handleSubscriptionKeyFn) { - this._handleSubscriptionKeyFn(key, (err, res) => { - if (err) { - log.error('message discarded by the subscriptionKeyFn') - return - } - - this._storeIfSubscriptionIsBetter(res, data) - }) - } else { - this._storeIfSubscriptionIsBetter(key, data) + let res + + try { + res = await this._handleSubscriptionKeyFn(key) + } catch (err) { + log.error('message discarded by the subscriptionKeyFn') + return + } + + key = res + } + + try { + await this._storeIfSubscriptionIsBetter(key, data) + } catch (err) { + log.error(err) } } // Store the received record if it is better than the current stored - _storeIfSubscriptionIsBetter (key, data) { - this._isBetter(key, data, (err, res) => { - if (!err && res) { - this._storeRecord(Buffer.from(key), data) + async _storeIfSubscriptionIsBetter (key, data) { + let isBetter = false + + try { + isBetter = await this._isBetter(key, data) + } catch (err) { + if (err.code !== 'ERR_NOT_VALID_RECORD') { + throw err } - }) + } + + if (isBetter) { + await this._storeRecord(Buffer.from(key), data) + } } // Validate record according to the received validation function - _validateRecord (value, peerId, callback) { - this._validator.validate(value, peerId, callback) + async _validateRecord (value, peerId) { // eslint-disable-line require-await + return this._validator.validate(value, peerId) } // Select the best record according to the received select function. - _selectRecord (receivedRecord, currentRecord, callback) { - this._validator.select(receivedRecord, currentRecord, (err, res) => { - if (err) { - log.error(err) - return callback(err) - } + async _selectRecord (receivedRecord, currentRecord) { + const res = await this._validator.select(receivedRecord, currentRecord) - // If the selected was the first (0), it should be stored (true) - callback(null, res === 0) - }) + // If the selected was the first (0), it should be stored (true) + return res === 0 } // Verify if the record received through pubsub is valid and better than the one currently stored - _isBetter (key, val, callback) { + async _isBetter (key, val) { // validate received record - this._validateRecord(val, key, (err, valid) => { - // If not valid, it is not better than the one currently available - if (err || !valid) { - const errMsg = 'record received through pubsub is not valid' + let error, valid - log.error(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_NOT_VALID_RECORD')) - } + try { + valid = await this._validateRecord(val, key) + } catch (err) { + error = err + } - // Get Local record - const dsKey = new Key(key) + // If not valid, it is not better than the one currently available + if (error || !valid) { + const errMsg = 'record received through pubsub is not valid' - this._getLocal(dsKey.toBuffer(), (err, currentRecord) => { - // if the old one is invalid, the new one is *always* better - if (err) { - return callback(null, true) - } + log.error(errMsg) + throw errcode(new Error(errMsg), 'ERR_NOT_VALID_RECORD') + } + + // Get Local record + const dsKey = new Key(key) + let currentRecord + + try { + currentRecord = await this._getLocal(dsKey.toBuffer()) + } catch (err) { + // if the old one is invalid, the new one is *always* better + return true + } - // if the same record, do not need to store - if (currentRecord.equals(val)) { - return callback(null, false) - } + // if the same record, do not need to store + if (currentRecord.equals(val)) { + return false + } - // verify if the received record should replace the current one - this._selectRecord(val, currentRecord, callback) - }) - }) + // verify if the received record should replace the current one + return this._selectRecord(val, currentRecord) } // add record to datastore - _storeRecord (key, data) { + async _storeRecord (key, data) { // encode key - base32(/ipns/{cid}) const routingKey = new Key('/' + encodeBase32(key), false) - this._datastore.put(routingKey, data, (err) => { - if (err) { - log.error(`record for ${key.toString()} could not be stored in the routing`) - return - } - - log(`record for ${key.toString()} was stored in the datastore`) - }) + await this._datastore.put(routingKey, data) + log(`record for ${key.toString()} was stored in the datastore`) } - open (callback) { - const errMsg = `open function was not implemented yet` + open () { + const errMsg = 'open function was not implemented yet' log.error(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET')) + throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET') } - has (key, callback) { - const errMsg = `has function was not implemented yet` + has (key) { + const errMsg = 'has function was not implemented yet' log.error(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET')) + throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET') } - delete (key, callback) { - const errMsg = `delete function was not implemented yet` + delete (key) { + const errMsg = 'delete function was not implemented yet' log.error(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET')) + throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET') } - close (callback) { - const errMsg = `close function was not implemented yet` + close () { + const errMsg = 'close function was not implemented yet' log.error(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET')) + throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET') } batch () { - const errMsg = `batch function was not implemented yet` + const errMsg = 'batch function was not implemented yet' log.error(errMsg) throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET') } query () { - const errMsg = `query function was not implemented yet` + const errMsg = 'query function was not implemented yet' log.error(errMsg) throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET') diff --git a/test/index.spec.js b/test/index.spec.js index 9d87f6a..53ecd24 100644 --- a/test/index.spec.js +++ b/test/index.spec.js @@ -6,10 +6,8 @@ const dirtyChai = require('dirty-chai') const expect = chai.expect chai.use(dirtyChai) const sinon = require('sinon') - +const errcode = require('err-code') const isNode = require('detect-node') -const parallel = require('async/parallel') -const series = require('async/series') const { Key } = require('interface-datastore') const { Record } = require('libp2p-record') @@ -17,15 +15,16 @@ const { Record } = require('libp2p-record') const DatastorePubsub = require('../src') const { keyToTopic } = require('../src/utils') const { connect, waitFor, waitForPeerToSubscribe, spawnDaemon, stopDaemon } = require('./utils') +const promisify = require('promisify-es6') // Always returning the expected values // Valid record and select the new one const smoothValidator = { - validate: (data, peerId, callback) => { - callback(null, true) + validate: () => { + return true }, - select: (receivedRecod, currentRecord, callback) => { - callback(null, 0) + select: () => { + return 0 } } @@ -38,32 +37,6 @@ describe('datastore-pubsub', function () { let ipfsdB = null let ipfsdAId = null let ipfsdBId = null - - // spawn daemon - before(function (done) { - parallel([ - (cb) => spawnDaemon(cb), - (cb) => spawnDaemon(cb) - ], (err, daemons) => { - expect(err).to.not.exist() - - ipfsdA = daemons[0] - ipfsdB = daemons[1] - - parallel([ - (cb) => ipfsdA.api.id(cb), - (cb) => ipfsdB.api.id(cb) - ], (err, ids) => { - expect(err).to.not.exist() - - ipfsdAId = ids[0] - ipfsdBId = ids[1] - - connect(ipfsdA, ipfsdAId, ipfsdB, ipfsdBId, done) - }) - }) - }) - let pubsubA = null let datastoreA = null let peerIdA = null @@ -72,17 +45,34 @@ describe('datastore-pubsub', function () { let peerIdB = null let pubsubB = null - // create DatastorePubsub instances - before(function (done) { + // spawn daemon and create DatastorePubsub instances + before(async function () { + [ipfsdA, ipfsdB] = await Promise.all([spawnDaemon(), spawnDaemon()]); + [ipfsdAId, ipfsdBId] = await Promise.all([ipfsdA.api.id(), ipfsdB.api.id()]) + + await connect(ipfsdA, ipfsdAId, ipfsdB, ipfsdBId) + pubsubA = ipfsdA.api.pubsub - datastoreA = ipfsdA.api._repo.datastore + datastoreA = { + get: promisify(ipfsdA.api._repo.datastore.get, { + context: ipfsdA.api._repo.datastore + }), + put: promisify(ipfsdA.api._repo.datastore.put, { + context: ipfsdA.api._repo.datastore + }) + } peerIdA = ipfsdA.api._peerInfo.id pubsubB = ipfsdB.api.pubsub - datastoreB = ipfsdB.api._repo.datastore + datastoreB = { + get: promisify(ipfsdB.api._repo.datastore.get, { + context: ipfsdB.api._repo.datastore + }), + put: promisify(ipfsdB.api._repo.datastore.put, { + context: ipfsdB.api._repo.datastore + }) + } peerIdB = ipfsdB.api._peerInfo.id - - done() }) const value = 'value' @@ -93,81 +83,77 @@ describe('datastore-pubsub', function () { let serializedRecord = null // prepare Record - beforeEach(function (done) { + beforeEach(() => { keyRef = `key${testCounter}` key = (new Key(keyRef)).toBuffer() record = new Record(key, Buffer.from(value)) serializedRecord = record.serialize() - done() }) - afterEach(function (done) { + afterEach(() => { ++testCounter - done() }) - after(function (done) { - parallel([ - (cb) => stopDaemon(ipfsdA, cb), - (cb) => stopDaemon(ipfsdB, cb) - ], done) + after(() => { + return Promise.all([ + stopDaemon(ipfsdA), + stopDaemon(ipfsdB) + ]) }) - it('should subscribe the topic, but receive error as no entry is stored locally', function (done) { + it('should subscribe the topic, but receive error as no entry is stored locally', async () => { const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) const subsTopic = keyToTopic(`/${keyRef}`) - pubsubA.ls((err, res) => { - expect(err).to.not.exist() - expect(res).to.exist() - expect(res).to.not.include(subsTopic) // not subscribed key reference yet + let subscribers = await pubsubA.ls() - dsPubsubA.get(key, (err) => { - expect(err).to.exist() // not locally stored record - expect(err.code).to.equal('ERR_NOT_FOUND') + expect(subscribers).to.exist() + expect(subscribers).to.not.include(subsTopic) // not subscribed key reference yet - pubsubA.ls((err, res) => { - expect(err).to.not.exist() - expect(res).to.exist() - expect(res).to.include(subsTopic) // subscribed key reference - done() - }) + const res = await dsPubsubA.get(key) + .then(() => expect.fail('Should have failed to fetch key'), (err) => { + // not locally stored record + expect(err.code).to.equal('ERR_NOT_FOUND') }) - }) + + expect(res).to.not.exist() + + subscribers = await pubsubA.ls() + + expect(subscribers).to.exist() + expect(subscribers).to.include(subsTopic) // subscribed key reference }) - it('should put correctly to daemon A and daemon B should not receive it without subscribing', function (done) { + it('should put correctly to daemon A and daemon B should not receive it without subscribing', async () => { const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator) const subsTopic = keyToTopic(`/${keyRef}`) - pubsubB.ls((err, res) => { - expect(err).to.not.exist() - expect(res).to.exist() - expect(res).to.not.include(subsTopic) // not subscribed - - dsPubsubA.put(key, serializedRecord, (err) => { - expect(err).to.not.exist() - dsPubsubB.get(key, (err, res) => { - expect(err).to.exist() // did not receive record - expect(res).to.not.exist() - done() - }) + const res = await pubsubB.ls() + expect(res).to.exist() + expect(res).to.not.include(subsTopic) // not subscribed + + await dsPubsubA.put(key, serializedRecord) + + await dsPubsubB.get(key) + .then(() => expect.fail('Should have failed to fetch key'), (err) => { + // not locally stored record + expect(err.code).to.equal('ERR_NOT_FOUND') }) - }) }) - it('should validate if record content is the same', function (done) { + it('should validate if record content is the same', async () => { const customValidator = { - validate: (data, peerId, callback) => { + validate: (data) => { const receivedRecord = Record.deserialize(data) expect(receivedRecord.value.toString()).to.equal(value) // validator should deserialize correctly - callback(null, receivedRecord.value.toString() === value) + + return receivedRecord.value.toString() === value }, - select: (receivedRecod, currentRecord, callback) => { - callback(null, 0) + select: () => { + return 0 } } const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) @@ -179,28 +165,30 @@ describe('datastore-pubsub', function () { receivedMessage = true } - dsPubsubB.get(key, (err, res) => { - expect(err).to.exist() - expect(res).to.not.exist() // no value available, but subscribed now - - series([ - (cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb), - // subscribe in order to understand when the message arrive to the node - (cb) => pubsubB.subscribe(subsTopic, messageHandler, cb), - (cb) => dsPubsubA.put(key, serializedRecord, cb), - // wait until message arrives - (cb) => waitFor(() => receivedMessage === true, cb), - // get from datastore - (cb) => dsPubsubB.get(key, cb) - ], (err, res) => { - expect(err).to.not.exist() - expect(res[4]).to.exist() - done() + // causes pubsub b to become subscribed to the topic + await dsPubsubB.get(key) + .then(() => expect.fail('Should have failed to fetch key'), (err) => { + // not locally stored record + expect(err.code).to.equal('ERR_NOT_FOUND') }) - }) + + await waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA) + + // subscribe in order to understand when the message arrive to the node + await pubsubB.subscribe(subsTopic, messageHandler) + + await dsPubsubA.put(key, serializedRecord) + + // wait until message arrives + await waitFor(() => receivedMessage === true) + + // get from datastore + const record = await dsPubsubB.get(key) + + expect(record).to.be.ok() }) - it('should put correctly to daemon A and daemon B should receive it as it tried to get it first and subscribed it', function (done) { + it('should put correctly to daemon A and daemon B should receive it as it tried to get it first and subscribed it', async () => { const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator) const subsTopic = keyToTopic(`/${keyRef}`) @@ -210,55 +198,50 @@ describe('datastore-pubsub', function () { receivedMessage = true } - pubsubB.ls((err, res) => { - expect(err).to.not.exist() - expect(res).to.exist() - expect(res).to.not.include(subsTopic) // not subscribed - - dsPubsubB.get(key, (err, res) => { - expect(err).to.exist() - expect(res).to.not.exist() // not value available, but subscribed now - - series([ - (cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb), - // subscribe in order to understand when the message arrive to the node - (cb) => pubsubB.subscribe(subsTopic, messageHandler, cb), - (cb) => dsPubsubA.put(key, serializedRecord, cb), - // wait until message arrives - (cb) => waitFor(() => receivedMessage === true, cb), - // get from datastore - (cb) => dsPubsubB.get(key, cb) - ], (err, res) => { - expect(err).to.not.exist() - expect(res).to.exist() - expect(res[4]).to.exist() - - const receivedRecord = Record.deserialize(res[4]) - - expect(receivedRecord.value.toString()).to.equal(value) - done() - }) + const res = await pubsubB.ls() + expect(res).to.exist() + expect(res).to.not.include(subsTopic) // not subscribed + + // causes pubsub b to become subscribed to the topic + await dsPubsubB.get(key) + .then(() => expect.fail('Should have failed to fetch key'), (err) => { + // not locally stored record + expect(err.code).to.equal('ERR_NOT_FOUND') }) - }) + + await waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA) + + // subscribe in order to understand when the message arrive to the node + await pubsubB.subscribe(subsTopic, messageHandler) + await dsPubsubA.put(key, serializedRecord) + + // wait until message arrives + await waitFor(() => receivedMessage === true) + + // get from datastore + const result = await dsPubsubB.get(key) + expect(result).to.exist() + + const receivedRecord = Record.deserialize(result) + expect(receivedRecord.value.toString()).to.equal(value) }) - it('should fail to create the DatastorePubsub if no validator is provided', function (done) { + it('should fail to create the DatastorePubsub if no validator is provided', () => { let dsPubsubB try { dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB) // no validator } catch (err) { - expect(err).to.exist() + expect(err.code).to.equal('ERR_INVALID_PARAMETERS') } expect(dsPubsubB).to.equal(undefined) - done() }) - it('should fail to create the DatastorePubsub if no validate function is provided', function (done) { + it('should fail to create the DatastorePubsub if no validate function is provided', () => { const customValidator = { validate: undefined, - select: (receivedRecod, currentRecord, callback) => { - callback(null, 0) + select: () => { + return 0 } } @@ -266,17 +249,16 @@ describe('datastore-pubsub', function () { try { dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator) } catch (err) { - expect(err).to.exist() + expect(err.code).to.equal('ERR_INVALID_PARAMETERS') } expect(dsPubsubB).to.equal(undefined) - done() }) - it('should fail to create the DatastorePubsub if no select function is provided', function (done) { + it('should fail to create the DatastorePubsub if no select function is provided', () => { const customValidator = { - validate: (data, peerId, callback) => { - callback(null, true) + validate: () => { + return true }, select: undefined } @@ -285,20 +267,19 @@ describe('datastore-pubsub', function () { try { dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator) } catch (err) { - expect(err).to.exist() + expect(err.code).to.equal('ERR_INVALID_PARAMETERS') } expect(dsPubsubB).to.equal(undefined) - done() }) - it('should fail if it fails to validate the record', function (done) { + it('should fail if it fails to validate the record', async () => { const customValidator = { - validate: (data, peerId, callback) => { - callback(null, false) // return false validation + validate: () => { + return false // return false validation }, - select: (receivedRecod, currentRecord, callback) => { - callback(null, 0) + select: () => { + return 0 } } const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) @@ -310,35 +291,39 @@ describe('datastore-pubsub', function () { receivedMessage = true } - dsPubsubB.get(key, (err, res) => { - expect(err).to.exist() - expect(res).to.not.exist() // not value available, but subscribed now - - series([ - (cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb), - // subscribe in order to understand when the message arrive to the node - (cb) => pubsubB.subscribe(subsTopic, messageHandler, cb), - (cb) => dsPubsubA.put(key, serializedRecord, cb), - // wait until message arrives - (cb) => waitFor(() => receivedMessage === true, cb), - // get from datastore - (cb) => dsPubsubB.get(key, cb) - ], (err, res) => { - // No record received, in spite of message received - expect(err).to.exist() // message was discarded as a result of failing the validation - expect(res[4]).to.not.exist() - done() + // causes pubsub b to become subscribed to the topic + await dsPubsubB.get(key) + .then(() => expect.fail('Should have failed to fetch key'), (err) => { + // not locally stored record + expect(err.code).to.equal('ERR_NOT_FOUND') }) - }) + + await waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA) + + // subscribe in order to understand when the message arrive to the node + await pubsubB.subscribe(subsTopic, messageHandler) + await dsPubsubA.put(key, serializedRecord) + + // wait until message arrives + await waitFor(() => receivedMessage === true) + + try { + // get from datastore + await dsPubsubB.get(key) + expect.fail('Should have disguarded invalid message') + } catch (err) { + // No record received, in spite of message received + expect(err.code).to.equal('ERR_NOT_FOUND') + } }) - it('should get the second record if the selector selects it as the newest one', function (done) { + it('should get the second record if the selector selects it as the newest one', async () => { const customValidator = { - validate: (data, peerId, callback) => { - callback(null, true) + validate: () => { + return true }, - select: (receivedRecod, currentRecord, callback) => { - callback(null, 1) // current record is the newer + select: () => { + return 1 // current record is the newer } } @@ -355,41 +340,40 @@ describe('datastore-pubsub', function () { receivedMessage = true } - dsPubsubB.get(key, (err, res) => { - expect(err).to.exist() - expect(res).to.not.exist() // not value available, but subscribed now - - series([ - (cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb), - // subscribe in order to understand when the message arrive to the node - (cb) => pubsubB.subscribe(subsTopic, messageHandler, cb), - (cb) => dsPubsubA.put(key, serializedRecord, cb), - // wait until message arrives - (cb) => waitFor(() => receivedMessage === true, cb), - (cb) => dsPubsubA.put(key, newSerializedRecord, cb), // put new serializedRecord - // wait until message arrives - (cb) => waitFor(() => receivedMessage === true, cb), - // get from datastore - (cb) => dsPubsubB.get(key, cb) - ], (err, res) => { - expect(err).to.not.exist() // message was discarded as a result of no validator available - expect(res[6]).to.exist() - - const receivedRecord = Record.deserialize(res[6]) - - expect(receivedRecord.value.toString()).to.not.equal(newValue) // not equal to the last value - done() + // causes pubsub b to become subscribed to the topic + await dsPubsubB.get(key) + .then(() => expect.fail('Should have failed to fetch key'), (err) => { + // not locally stored record + expect(err.code).to.equal('ERR_NOT_FOUND') }) - }) + + await waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA) + + // subscribe in order to understand when the message arrive to the node + await pubsubB.subscribe(subsTopic, messageHandler) + await dsPubsubA.put(key, serializedRecord) + + // wait until message arrives + await waitFor(() => receivedMessage === true) + await dsPubsubA.put(key, newSerializedRecord) // put new serializedRecord + + // wait until message arrives + await waitFor(() => receivedMessage === true) + + // get from datastore + // message was discarded as a result of no validator available + const result = await dsPubsubB.get(key) + const receivedRecord = Record.deserialize(result) + expect(receivedRecord.value.toString()).to.not.equal(newValue) // not equal to the last value }) - it('should get the new record if the selector selects it as the newest one', function (done) { + it('should get the new record if the selector selects it as the newest one', async () => { const customValidator = { - validate: (data, peerId, callback) => { - callback(null, true) + validate: () => { + return true }, - select: (receivedRecod, currentRecord, callback) => { - callback(null, 0) // received record is the newer + select: () => { + return 0 // received record is the newer } } @@ -406,38 +390,45 @@ describe('datastore-pubsub', function () { receivedMessage = true } - dsPubsubB.get(key, (err, res) => { - expect(err).to.exist() - expect(res).to.not.exist() // not value available, but it is subscribed now - - series([ - (cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb), - // subscribe in order to understand when the message arrive to the node - (cb) => pubsubB.subscribe(subsTopic, messageHandler, cb), - (cb) => dsPubsubA.put(key, serializedRecord, cb), - // wait until message arrives - (cb) => waitFor(() => receivedMessage === true, cb), - (cb) => dsPubsubA.put(key, newSerializedRecord, cb), // put new serializedRecord - // wait until message arrives - (cb) => waitFor(() => receivedMessage === true, cb), - // get from datastore - (cb) => dsPubsubB.get(key, cb) - ], (err, res) => { - expect(err).to.not.exist() // message was discarded as a result of no validator available - expect(res[6]).to.exist() - - const receivedRecord = Record.deserialize(res[6]) - - expect(receivedRecord.value.toString()).to.equal(newValue) // equal to the last value - done() + // causes pubsub b to become subscribed to the topic + await dsPubsubB.get(key) + .then(() => expect.fail('Should have failed to fetch key'), (err) => { + // not locally stored record + expect(err.code).to.equal('ERR_NOT_FOUND') }) - }) + + await waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA) + + // subscribe in order to understand when the message arrive to the node + await pubsubB.subscribe(subsTopic, messageHandler) + await dsPubsubA.put(key, serializedRecord) + + // wait until message arrives + await waitFor(() => receivedMessage === true) + + // reset message wait + receivedMessage = false + + // put new serializedRecord + await dsPubsubA.put(key, newSerializedRecord) + + // wait until second message arrives + await waitFor(() => receivedMessage === true) + + // get from datastore + const result = await dsPubsubB.get(key) + + // message was discarded as a result of no validator available + const receivedRecord = Record.deserialize(result) + + // equal to the last value + expect(receivedRecord.value.toString()).to.equal(newValue) }) - it('should subscribe the topic and after a message being received, discard it using the subscriptionKeyFn', function (done) { - const subscriptionKeyFn = (topic, callback) => { + it('should subscribe the topic and after a message being received, discard it using the subscriptionKeyFn', async () => { + const subscriptionKeyFn = (topic) => { expect(topic).to.equal(`/${keyRef}`) - callback(new Error('DISCARD MESSAGE')) + throw new Error('DISCARD MESSAGE') } const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator, subscriptionKeyFn) @@ -448,36 +439,39 @@ describe('datastore-pubsub', function () { receivedMessage = true } - pubsubB.ls((err, res) => { - expect(err).to.not.exist() - expect(res).to.exist() - expect(res).to.not.include(subsTopic) // not subscribed - - dsPubsubB.get(key, (err, res) => { - expect(err).to.exist() - expect(res).to.not.exist() // not value available, but subscribed now - - series([ - (cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb), - // subscribe in order to understand when the message arrive to the node - (cb) => pubsubB.subscribe(subsTopic, messageHandler, cb), - (cb) => dsPubsubA.put(key, serializedRecord, cb), - // wait until message arrives - (cb) => waitFor(() => receivedMessage === true, cb), - // get from datastore - (cb) => dsPubsubB.get(key, cb) - ], (err) => { - expect(err).to.exist() // As message was discarded, it was not stored in the datastore - done() - }) + const res = await pubsubB.ls() + expect(res).to.not.include(subsTopic) // not subscribed + + // causes pubsub b to become subscribed to the topic + await dsPubsubB.get(key) + .then(() => expect.fail('Should have failed to fetch key'), (err) => { + // not locally stored record + expect(err.code).to.equal('ERR_NOT_FOUND') }) - }) + + await waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA) + + // subscribe in order to understand when the message arrive to the node + await pubsubB.subscribe(subsTopic, messageHandler) + await dsPubsubA.put(key, serializedRecord) + + // wait until message arrives + await waitFor(() => receivedMessage === true) + + // get from datastore + try { + await dsPubsubB.get(key) + expect.fail('Should not have stored message') + } catch (err) { + // As message was discarded, it was not stored in the datastore + expect(err.code).to.equal('ERR_NOT_FOUND') + } }) - it('should subscribe the topic and after a message being received, change its key using subscriptionKeyFn', function (done) { - const subscriptionKeyFn = (topic, callback) => { + it('should subscribe the topic and after a message being received, change its key using subscriptionKeyFn', async () => { + const subscriptionKeyFn = (topic) => { expect(topic).to.equal(key.toString()) - callback(null, `${topic}new`) + return `${topic}new` } const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator, subscriptionKeyFn) @@ -489,64 +483,66 @@ describe('datastore-pubsub', function () { receivedMessage = true } - pubsubB.ls((err, res) => { - expect(err).to.not.exist() - expect(res).to.exist() - expect(res).to.not.include(subsTopic) // not subscribed - - dsPubsubB.get(key, (err, res) => { - expect(err).to.exist() - expect(res).to.not.exist() // not value available, but subscribed now - - series([ - (cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb), - // subscribe in order to understand when the message arrive to the node - (cb) => pubsubB.subscribe(subsTopic, messageHandler, cb), - (cb) => dsPubsubA.put(key, serializedRecord, cb), - // wait until message arrives - (cb) => waitFor(() => receivedMessage === true, cb), - // get from datastore - (cb) => dsPubsubB.get(keyNew, cb) - ], (err, res) => { - expect(err).to.not.exist() - expect(res).to.exist() - expect(res[4]).to.exist() - - const receivedRecord = Record.deserialize(res[4]) - - expect(receivedRecord.value.toString()).to.equal(value) - done() - }) + const res = await pubsubB.ls() + expect(res).to.not.include(subsTopic) // not subscribed + + // causes pubsub b to become subscribed to the topic + await dsPubsubB.get(key) + .then(() => expect.fail('Should have failed to fetch key'), (err) => { + // not locally stored record + expect(err.code).to.equal('ERR_NOT_FOUND') }) - }) + + await waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA) + + // subscribe in order to understand when the message arrive to the node + await pubsubB.subscribe(subsTopic, messageHandler) + await dsPubsubA.put(key, serializedRecord) + + // wait until message arrives + await waitFor(() => receivedMessage === true) + + // get from datastore + const result = await dsPubsubB.get(keyNew) + const receivedRecord = Record.deserialize(result) + + expect(receivedRecord.value.toString()).to.equal(value) }) - it('should subscribe a topic only once', function (done) { + it('should subscribe a topic only once', async () => { const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) sinon.spy(pubsubA, 'subscribe') - dsPubsubA.get(key, (err) => { - expect(err).to.exist() // not locally stored record - dsPubsubA.get(key, (err) => { - expect(err).to.exist() // not locally stored record - expect(pubsubA.subscribe.calledOnce).to.equal(true) + // causes pubsub b to become subscribed to the topic + await dsPubsubA.get(key) + .then(() => expect.fail('Should have failed to fetch key'), (err) => { + // not locally stored record + expect(err.code).to.equal('ERR_NOT_FOUND') + }) - done() + // causes pubsub b to become subscribed to the topic + await dsPubsubA.get(key) + .then(() => expect.fail('Should have failed to fetch key'), (err) => { + // not locally stored record + expect(err.code).to.equal('ERR_NOT_FOUND') }) - }) + + expect(pubsubA.subscribe.calledOnce).to.equal(true) }) - it('should handle a unexpected error properly when getting from the datastore', function (done) { + it('should handle a unexpected error properly when getting from the datastore', async () => { const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) - const stub = sinon.stub(dsPubsubA._datastore, 'get').callsArgWith(1, { code: 'RANDOM_ERR' }) - - dsPubsubA.get(key, (err) => { - expect(err).to.exist() // not locally stored record - expect(err.code).to.equal('ERR_UNEXPECTED_ERROR_GETTING_RECORD') + const stub = sinon.stub(dsPubsubA._datastore, 'get').throws(errcode(new Error('Wut'), 'RANDOM_ERR')) - stub.restore() - done() - }) + // causes pubsub b to become subscribed to the topic + await dsPubsubA.get(key) + .then(() => expect.fail('Should have failed to fetch key'), (err) => { + // not locally stored record + expect(err.code).to.equal('ERR_UNEXPECTED_ERROR_GETTING_RECORD') + }) + .finally(() => { + stub.restore() + }) }) }) diff --git a/test/utils.js b/test/utils.js index 0ba71ca..cd52cdf 100644 --- a/test/utils.js +++ b/test/utils.js @@ -2,10 +2,7 @@ const ipfs = require('ipfs') const DaemonFactory = require('ipfsd-ctl') - -const parallel = require('async/parallel') -const retry = require('async/retry') -const series = require('async/series') +const delay = require('delay') const config = { Addresses: { @@ -16,69 +13,65 @@ const config = { } // spawn a daemon -const spawnDaemon = (callback) => { - DaemonFactory.create({ exec: ipfs, type: 'proc' }) - .spawn({ - args: ['--enable-pubsub-experiment'], - disposable: true, - bits: 512, - config - }, callback) +const spawnDaemon = () => { + const d = DaemonFactory.create({ + exec: ipfs, + type: 'proc', + IpfsApi: require('ipfs-http-client') + }) + + return d.spawn({ + disposable: true, + bits: 512, + config, + EXPERIMENTAL: { + pubsub: true + } + }) } // stop a daemon -const stopDaemon = (daemon, callback) => { - series([ - (cb) => daemon.stop(cb), - (cb) => setTimeout(cb, 200), - (cb) => daemon.cleanup(cb) - ], callback) +const stopDaemon = async (daemon) => { + await daemon.stop() + await new Promise((resolve) => setTimeout(() => resolve(), 200)) + return daemon.cleanup() } // connect two peers -const connect = (dA, dAId, dB, dBId, callback) => { +const connect = (dA, dAId, dB, dBId) => { const dALocalAddress = dAId.addresses.find(a => a.includes('127.0.0.1')) const dBLocalAddress = dBId.addresses.find(a => a.includes('127.0.0.1')) - parallel([ - (cb) => dA.api.swarm.connect(dBLocalAddress, cb), - (cb) => dB.api.swarm.connect(dALocalAddress, cb) - ], callback) + return Promise.all([ + dA.api.swarm.connect(dBLocalAddress), + dB.api.swarm.connect(dALocalAddress) + ]) } // Wait for a condition to become true. When its true, callback is called. -const waitFor = (predicate, callback) => { - const ttl = Date.now() + (10 * 1000) - const self = setInterval(() => { - if (predicate()) { - clearInterval(self) - return callback() +const waitFor = async (predicate) => { + for (let i = 0; i < 10; i++) { + if (await predicate()) { + return } - if (Date.now() > ttl) { - clearInterval(self) - return callback(new Error('waitFor time expired')) - } - }, 500) + + await delay(1000) + } + + throw new Error('waitFor time expired') } // Wait until a peer subscribes a topic -const waitForPeerToSubscribe = (topic, peer, daemon, callback) => { - retry({ - times: 5, - interval: 1000 - }, (next) => { - daemon.api.pubsub.peers(topic, (error, peers) => { - if (error) { - return next(error) - } +const waitForPeerToSubscribe = async (topic, peer, daemon) => { + for (let i = 0; i < 5; i++) { + const peers = await daemon.api.pubsub.peers(topic) - if (!peers.includes(peer.id)) { - return next(new Error(`Could not find peer ${peer.id}`)) - } + if (peers.includes(peer.id)) { + return + } - return next() - }) - }, callback) + await delay(1000) + } } module.exports = {