Skip to content

feat: add topic validators to pubsub #75

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 61 additions & 9 deletions src/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ class BasicPubSub extends Pubsub {
* @returns {string} message id as string
*/
this.defaultMsgIdFn = (msg) => utils.msgId(msg.from, msg.seqno)

/**
* Topic validator function
* @typedef {function(topic: string, peer: Peer, message: RPC): boolean} validator
*/

/**
* Topic validator map
*
* Keyed by topic
* Topic validators are functions with the following input:
* @type {Map<string, validator>}
*/
this.topicValidators = new Map()
}

/**
Expand Down Expand Up @@ -147,17 +161,14 @@ class BasicPubSub extends Pubsub {
const msg = utils.normalizeInRpcMessage(message)

// Ensure the message is valid before processing it
let isValid
let error

try {
isValid = await this.validate(message)
const isValid = await this.validate(message, peer)
if (!isValid) {
this.log('Message is invalid, dropping it.')
return
}
} catch (err) {
error = err
}

if (error || !isValid) {
this.log('Message could not be validated, dropping it. isValid=%s', isValid, error)
this.log('Error in message validation, dropping it. %O', err)
return
}

Expand All @@ -166,6 +177,47 @@ class BasicPubSub extends Pubsub {
}
}

/**
* Validates the given message.
* @param {RPC.Message} message
* @param {Peer} [peer]
* @returns {Promise<Boolean>}
*/
async validate (message, peer) {
const isValid = await super.validate(message, peer)
if (!isValid) {
return false
}
// only run topic validators if the peer is passed as an arg
if (!peer) {
return true
}
return message.topicIDs.every(topic => {
const validatorFn = this.topicValidators.get(topic)
if (!validatorFn) {
return true
}
return this._processTopicValidatorResult(topic, peer, message, validatorFn(topic, peer, message))
})
}

/**
* Coerces topic validator result to determine message validity
*
* Defaults to true if truthy
*
* Override this method to provide custom topic validator result processing (eg: scoring)
*
* @param {String} topic
* @param {Peer} peer
* @param {RPC.Message} message
* @param {unknown} result
* @returns {Boolean}
*/
_processTopicValidatorResult (topic, peer, message, result) {
return Boolean(result)
}

/**
* Handles an subscription change from a peer
*
Expand Down
14 changes: 9 additions & 5 deletions test/2-nodes.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,16 @@ describe('2 nodes', () => {

it('Subscribe to a topic', async () => {
const topic = 'Z'
await new Promise((resolve) => setTimeout(resolve, 2000))
nodes[0].subscribe(topic)
nodes[1].subscribe(topic)

// await subscription change
const [changedPeerId, changedTopics, changedSubs] = await new Promise((resolve) => {
nodes[0].once('pubsub:subscription-change', (...args) => resolve(args))
})
const [evt0] = await Promise.all([
new Promise(resolve => nodes[0].once('pubsub:subscription-change', (...args) => resolve(args))),
new Promise(resolve => nodes[1].once('pubsub:subscription-change', (...args) => resolve(args)))
])

const [changedPeerId, changedTopics, changedSubs] = evt0

expectSet(nodes[0].subscriptions, [topic])
expectSet(nodes[1].subscriptions, [topic])
Expand Down Expand Up @@ -134,7 +136,9 @@ describe('2 nodes', () => {
nodes[1].subscribe(topic)

// await subscription change and heartbeat
await new Promise((resolve) => nodes[0].once('pubsub:subscription-change', resolve))
await Promise.all(
nodes.map(n => new Promise(resolve => n.once('pubsub:subscription-change', resolve)))
)
await Promise.all([
new Promise((resolve) => nodes[0].once('gossipsub:heartbeat', resolve)),
new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve))
Expand Down
71 changes: 71 additions & 0 deletions test/pubsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,75 @@ describe('Pubsub', () => {
await pWaitFor(() => gossipsub._onPeerDisconnected.calledWith(peerId), { timeout: 1000 })
})
})

describe('topic validators', () => {
it('should filter messages by topic validator', async () => {
// use processRpcMessage.callCount to see if a message is valid or not
// a valid message will trigger processRpcMessage
sinon.stub(gossipsub, '_processRpcMessage')
// Disable strict signing
sinon.stub(gossipsub, 'strictSigning').value(false)
const filteredTopic = 't'
const peerStr = 'QmAnotherPeer'
gossipsub.peers.set(peerStr, {})

// Set a trivial topic validator
gossipsub.topicValidators.set(filteredTopic, (topic, peer, message) => {
return message.data.equals(Buffer.from('a message'))
})

// valid case
const validRpc = {
subscriptions: [],
msgs: [{
from: gossipsub.peerId.id,
data: Buffer.from('a message'),
seqno: utils.randomSeqno(),
topicIDs: [filteredTopic]
}]
}

// process valid message
gossipsub._processRpc(peerStr, {}, validRpc)
await new Promise(resolve => setTimeout(resolve, 500))
expect(gossipsub._processRpcMessage.callCount).to.eql(1)

// invalid case
const invalidRpc = {
subscriptions: [],
msgs: [{
from: gossipsub.peerId.id,
data: Buffer.from('a different message'),
seqno: utils.randomSeqno(),
topicIDs: [filteredTopic]
}]
}

// process invalid message
gossipsub._processRpc(peerStr, {}, invalidRpc)
await new Promise(resolve => setTimeout(resolve, 500))
expect(gossipsub._processRpcMessage.callCount).to.eql(1)

// remove topic validator
gossipsub.topicValidators.delete(filteredTopic)

// another invalid case
const invalidRpc2 = {
subscriptions: [],
msgs: [{
from: gossipsub.peerId.id,
data: Buffer.from('a different message'),
seqno: utils.randomSeqno(),
topicIDs: [filteredTopic]
}]
}

// process previously invalid message, now is valid
gossipsub._processRpc(peerStr, {}, invalidRpc2)
await new Promise(resolve => setTimeout(resolve, 500))
expect(gossipsub._processRpcMessage.callCount).to.eql(2)
// cleanup
gossipsub.peers.delete(peerStr)
})
})
})