Skip to content

Commit db5afb5

Browse files
committed
feat: add topic validators to pubsub
1 parent acb5d28 commit db5afb5

File tree

3 files changed

+132
-15
lines changed

3 files changed

+132
-15
lines changed

src/pubsub.js

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,15 @@ class BasicPubSub extends Pubsub {
7878
* @returns {string} message id as string
7979
*/
8080
this.defaultMsgIdFn = (msg) => utils.msgId(msg.from, msg.seqno)
81+
82+
/**
83+
* Topic validator map
84+
*
85+
* Keyed by topic
86+
* Topic validators are functions with the following input:
87+
* (topic: string, peer: Peer, message: rpc.RPC)
88+
*/
89+
this.topicValidators = new Map()
8190
}
8291

8392
/**
@@ -183,17 +192,14 @@ class BasicPubSub extends Pubsub {
183192
this.seenCache.put(msgID)
184193

185194
// Ensure the message is valid before processing it
186-
let isValid
187-
let error
188-
189195
try {
190-
isValid = await this.validate(message)
196+
const isValid = await this.validate(peer, message)
197+
if (!isValid) {
198+
this.log('Message is invalid, dropping it.')
199+
return
200+
}
191201
} catch (err) {
192-
error = err
193-
}
194-
195-
if (error || !isValid) {
196-
this.log('Message could not be validated, dropping it. isValid=%s', isValid, error)
202+
this.log('Error in message validation, dropping it. %O', err)
197203
return
198204
}
199205

@@ -203,6 +209,46 @@ class BasicPubSub extends Pubsub {
203209
this._handleRpcControl(peer, rpc)
204210
}
205211

212+
/**
213+
* Validates the given message.
214+
* @param {Peer} peer
215+
* @param {rpc.RPC.Message} message
216+
* @returns {Promise<Boolean>}
217+
*/
218+
async validate (peer, message) {
219+
const isValid = await super.validate(message)
220+
if (!isValid) {
221+
return false
222+
}
223+
for (const topic of message.topicIDs) {
224+
const validatorFn = this.topicValidators.get(topic)
225+
if (validatorFn) {
226+
const result = validatorFn(topic, peer, message)
227+
if (!this._processTopicValidatorResult(topic, peer, message, result)) {
228+
return false
229+
}
230+
}
231+
}
232+
return true
233+
}
234+
235+
/**
236+
* Coerces topic validator result to determine message validity
237+
*
238+
* Defaults to true if truthy
239+
*
240+
* Override this method to provide custom topic validator result processing (eg: scoring)
241+
*
242+
* @param {String} topic
243+
* @param {Peer} peer
244+
* @param {rpc.RPC.Message} message
245+
* @param {unknown} result
246+
* @returns {Promise<Boolean>}
247+
*/
248+
_processTopicValidatorResult (topic, peer, message, result) {
249+
return Boolean(result)
250+
}
251+
206252
/**
207253
* @param {rpc.RPC.Message} msg
208254
*/

test/2-nodes.spec.js

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,16 @@ describe('2 nodes', () => {
8888

8989
it('Subscribe to a topic', async () => {
9090
const topic = 'Z'
91-
await new Promise((resolve) => setTimeout(resolve, 2000))
9291
nodes[0].subscribe(topic)
9392
nodes[1].subscribe(topic)
9493

9594
// await subscription change
96-
const [changedPeerId, changedTopics, changedSubs] = await new Promise((resolve) => {
97-
nodes[0].once('pubsub:subscription-change', (...args) => resolve(args))
98-
})
95+
const [evt0] = await Promise.all([
96+
new Promise(resolve => nodes[0].once('pubsub:subscription-change', (...args) => resolve(args))),
97+
new Promise(resolve => nodes[1].once('pubsub:subscription-change', (...args) => resolve(args)))
98+
])
99+
100+
const [changedPeerId, changedTopics, changedSubs] = evt0
99101

100102
expectSet(nodes[0].subscriptions, [topic])
101103
expectSet(nodes[1].subscriptions, [topic])
@@ -134,7 +136,9 @@ describe('2 nodes', () => {
134136
nodes[1].subscribe(topic)
135137

136138
// await subscription change and heartbeat
137-
await new Promise((resolve) => nodes[0].once('pubsub:subscription-change', resolve))
139+
await Promise.all(
140+
nodes.map(n => new Promise(resolve => n.once('pubsub:subscription-change', resolve)))
141+
)
138142
await Promise.all([
139143
new Promise((resolve) => nodes[0].once('gossipsub:heartbeat', resolve)),
140144
new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve))

test/pubsub.spec.js

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ describe('Pubsub', () => {
3636

3737
// Get the first message sent to _publish, and validate it
3838
const signedMessage = gossipsub._publish.getCall(0).lastArg[0]
39-
const isValid = await gossipsub.validate(signedMessage)
39+
const isValid = await gossipsub.validate({}, signedMessage)
4040

4141
expect(isValid).to.eql(true)
4242
})
@@ -152,4 +152,71 @@ describe('Pubsub', () => {
152152
await pWaitFor(() => gossipsub._onPeerDisconnected.calledWith(peerId), { timeout: 1000 })
153153
})
154154
})
155+
156+
describe('topic validators', () => {
157+
it('should filter messages by topic validator', async () => {
158+
// use onRpcMessage.callCount to see if a message is valid or not
159+
// a valid message will trigger onRpcMessage
160+
sinon.spy(gossipsub, '_onRpcMessage')
161+
// Disable strict signing
162+
sinon.stub(gossipsub, 'strictSigning').value(false)
163+
const filteredTopic = 't'
164+
165+
// Set a trivial topic validator
166+
gossipsub.topicValidators.set(filteredTopic, (topic, peer, message) => {
167+
return message.data.equals(Buffer.from('a message'))
168+
})
169+
170+
// valid case
171+
const validRpc = {
172+
subscriptions: [],
173+
msgs: [{
174+
from: gossipsub.peerId.id,
175+
data: Buffer.from('a message'),
176+
seqno: utils.randomSeqno(),
177+
topicIDs: [filteredTopic]
178+
}]
179+
}
180+
181+
// process valid message
182+
gossipsub._onRpc('QmAnotherPeer', {}, validRpc)
183+
await new Promise(resolve => setTimeout(resolve, 500))
184+
expect(gossipsub._onRpcMessage.callCount).to.eql(1)
185+
186+
// invalid case
187+
const invalidRpc = {
188+
subscriptions: [],
189+
msgs: [{
190+
from: gossipsub.peerId.id,
191+
data: Buffer.from('a different message'),
192+
seqno: utils.randomSeqno(),
193+
topicIDs: [filteredTopic]
194+
}]
195+
}
196+
197+
// process invalid message
198+
gossipsub._onRpc('QmAnotherPeer', {}, invalidRpc)
199+
await new Promise(resolve => setTimeout(resolve, 500))
200+
expect(gossipsub._onRpcMessage.callCount).to.eql(1)
201+
202+
// remove topic validator
203+
gossipsub.topicValidators.delete(filteredTopic)
204+
205+
// another invalid case
206+
const invalidRpc2 = {
207+
subscriptions: [],
208+
msgs: [{
209+
from: gossipsub.peerId.id,
210+
data: Buffer.from('a different message'),
211+
seqno: utils.randomSeqno(),
212+
topicIDs: [filteredTopic]
213+
}]
214+
}
215+
216+
// process previously invalid message, now is valid
217+
gossipsub._onRpc('QmAnotherPeer', {}, invalidRpc2)
218+
await new Promise(resolve => setTimeout(resolve, 500))
219+
expect(gossipsub._onRpcMessage.callCount).to.eql(2)
220+
})
221+
})
155222
})

0 commit comments

Comments
 (0)