diff --git a/src/pubsub.js b/src/pubsub.js index e56ad38c..29de5803 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -61,6 +61,20 @@ class BasicPubSub extends Pubsub { * @returns {string} message id as string */ this.defaultMsgIdFn = (msg) => utils.msgId(msg.from, msg.seqno) + + /** + * Topic validator function + * @typedef {function(topic: string, peer: Peer, message: RPC): boolean} validator + */ + + /** + * Topic validator map + * + * Keyed by topic + * Topic validators are functions with the following input: + * @type {Map} + */ + this.topicValidators = new Map() } /** @@ -147,17 +161,14 @@ class BasicPubSub extends Pubsub { const msg = utils.normalizeInRpcMessage(message) // Ensure the message is valid before processing it - let isValid - let error - try { - isValid = await this.validate(message) + const isValid = await this.validate(message, peer) + if (!isValid) { + this.log('Message is invalid, dropping it.') + return + } } catch (err) { - error = err - } - - if (error || !isValid) { - this.log('Message could not be validated, dropping it. isValid=%s', isValid, error) + this.log('Error in message validation, dropping it. %O', err) return } @@ -166,6 +177,47 @@ class BasicPubSub extends Pubsub { } } + /** + * Validates the given message. + * @param {RPC.Message} message + * @param {Peer} [peer] + * @returns {Promise} + */ + async validate (message, peer) { + const isValid = await super.validate(message, peer) + if (!isValid) { + return false + } + // only run topic validators if the peer is passed as an arg + if (!peer) { + return true + } + return message.topicIDs.every(topic => { + const validatorFn = this.topicValidators.get(topic) + if (!validatorFn) { + return true + } + return this._processTopicValidatorResult(topic, peer, message, validatorFn(topic, peer, message)) + }) + } + + /** + * Coerces topic validator result to determine message validity + * + * Defaults to true if truthy + * + * Override this method to provide custom topic validator result processing (eg: scoring) + * + * @param {String} topic + * @param {Peer} peer + * @param {RPC.Message} message + * @param {unknown} result + * @returns {Boolean} + */ + _processTopicValidatorResult (topic, peer, message, result) { + return Boolean(result) + } + /** * Handles an subscription change from a peer * diff --git a/test/2-nodes.spec.js b/test/2-nodes.spec.js index e1e21861..5625864f 100644 --- a/test/2-nodes.spec.js +++ b/test/2-nodes.spec.js @@ -88,14 +88,16 @@ describe('2 nodes', () => { it('Subscribe to a topic', async () => { const topic = 'Z' - await new Promise((resolve) => setTimeout(resolve, 2000)) nodes[0].subscribe(topic) nodes[1].subscribe(topic) // await subscription change - const [changedPeerId, changedTopics, changedSubs] = await new Promise((resolve) => { - nodes[0].once('pubsub:subscription-change', (...args) => resolve(args)) - }) + const [evt0] = await Promise.all([ + new Promise(resolve => nodes[0].once('pubsub:subscription-change', (...args) => resolve(args))), + new Promise(resolve => nodes[1].once('pubsub:subscription-change', (...args) => resolve(args))) + ]) + + const [changedPeerId, changedTopics, changedSubs] = evt0 expectSet(nodes[0].subscriptions, [topic]) expectSet(nodes[1].subscriptions, [topic]) @@ -134,7 +136,9 @@ describe('2 nodes', () => { nodes[1].subscribe(topic) // await subscription change and heartbeat - await new Promise((resolve) => nodes[0].once('pubsub:subscription-change', resolve)) + await Promise.all( + nodes.map(n => new Promise(resolve => n.once('pubsub:subscription-change', resolve))) + ) await Promise.all([ new Promise((resolve) => nodes[0].once('gossipsub:heartbeat', resolve)), new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve)) diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index 81530f05..d9991353 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -152,4 +152,75 @@ describe('Pubsub', () => { await pWaitFor(() => gossipsub._onPeerDisconnected.calledWith(peerId), { timeout: 1000 }) }) }) + + describe('topic validators', () => { + it('should filter messages by topic validator', async () => { + // use processRpcMessage.callCount to see if a message is valid or not + // a valid message will trigger processRpcMessage + sinon.stub(gossipsub, '_processRpcMessage') + // Disable strict signing + sinon.stub(gossipsub, 'strictSigning').value(false) + const filteredTopic = 't' + const peerStr = 'QmAnotherPeer' + gossipsub.peers.set(peerStr, {}) + + // Set a trivial topic validator + gossipsub.topicValidators.set(filteredTopic, (topic, peer, message) => { + return message.data.equals(Buffer.from('a message')) + }) + + // valid case + const validRpc = { + subscriptions: [], + msgs: [{ + from: gossipsub.peerId.id, + data: Buffer.from('a message'), + seqno: utils.randomSeqno(), + topicIDs: [filteredTopic] + }] + } + + // process valid message + gossipsub._processRpc(peerStr, {}, validRpc) + await new Promise(resolve => setTimeout(resolve, 500)) + expect(gossipsub._processRpcMessage.callCount).to.eql(1) + + // invalid case + const invalidRpc = { + subscriptions: [], + msgs: [{ + from: gossipsub.peerId.id, + data: Buffer.from('a different message'), + seqno: utils.randomSeqno(), + topicIDs: [filteredTopic] + }] + } + + // process invalid message + gossipsub._processRpc(peerStr, {}, invalidRpc) + await new Promise(resolve => setTimeout(resolve, 500)) + expect(gossipsub._processRpcMessage.callCount).to.eql(1) + + // remove topic validator + gossipsub.topicValidators.delete(filteredTopic) + + // another invalid case + const invalidRpc2 = { + subscriptions: [], + msgs: [{ + from: gossipsub.peerId.id, + data: Buffer.from('a different message'), + seqno: utils.randomSeqno(), + topicIDs: [filteredTopic] + }] + } + + // process previously invalid message, now is valid + gossipsub._processRpc(peerStr, {}, invalidRpc2) + await new Promise(resolve => setTimeout(resolve, 500)) + expect(gossipsub._processRpcMessage.callCount).to.eql(2) + // cleanup + gossipsub.peers.delete(peerStr) + }) + }) })