From 20ddde364929ede771bb6b232655dbee47c17e3c Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Wed, 23 Mar 2022 10:37:10 +0700 Subject: [PATCH 1/2] Forward messages to floodsub peers --- ts/index.ts | 43 +++++++++++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/ts/index.ts b/ts/index.ts index 988e495d..a9bdc157 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -185,6 +185,9 @@ export default class Gossipsub extends EventEmitter { /** Direct peers */ private readonly direct = new Set() + /** Floodsub peers */ + private readonly floodsubPeers = new Set() + /** Cache of seen messages */ private readonly seenCache: SimpleTimeCache @@ -595,29 +598,33 @@ export default class Gossipsub extends EventEmitter { * Add a peer to the router */ private addPeer(peerId: PeerId, protocol: string, direction: ConnectionDirection): PeerStreams { - let peerStreams = this.peers.get(peerId.toB58String()) + const peerIdStr = peerId.toB58String() + let peerStreams = this.peers.get(peerIdStr) // If peer streams already exists, do nothing if (peerStreams === undefined) { // else create a new peer streams - this.log('new peer %s', peerId.toB58String()) + this.log('new peer %s', peerIdStr) peerStreams = new PeerStreams({ id: peerId, protocol }) - this.peers.set(peerId.toB58String(), peerStreams) + this.peers.set(peerIdStr, peerStreams) peerStreams.addListener('close', () => this.removePeer(peerId)) } // Add to peer scoring - this.score.addPeer(peerId.toB58String()) + this.score.addPeer(peerIdStr) + if (protocol === constants.FloodsubID) { + this.floodsubPeers.add(peerIdStr) + } this.metrics?.peersPerProtocol.inc({ protocol }, 1) // track the connection direction. Don't allow to unset outbound - if (!this.outbound.get(peerId.toB58String())) { - this.outbound.set(peerId.toB58String(), direction === 'outbound') + if (!this.outbound.get(peerIdStr)) { + this.outbound.set(peerIdStr, direction === 'outbound') } return peerStreams @@ -660,6 +667,8 @@ export default class Gossipsub extends EventEmitter { peers.delete(id) } + // Remove from floodsubPeers + this.floodsubPeers.delete(id) // Remove from gossip mapping this.gossip.delete(id) // Remove from control mapping @@ -1617,6 +1626,18 @@ export default class Gossipsub extends EventEmitter { tosend.add(peer) } }) + + // as of Mar 2022, rust-libp2p does not have this while golang-libp2p and the spec do have + this.floodsubPeers.forEach((peer) => { + if ( + peersInTopic.has(peer) && + propagationSource !== peer && + !excludePeers?.has(peer) && + this.score.score(peer) >= this.opts.scoreThresholds.publishThreshold + ) { + tosend.add(peer) + } + }) } // add mesh peers @@ -1673,14 +1694,8 @@ export default class Gossipsub extends EventEmitter { }) // floodsub peers - peersInTopic.forEach((id) => { - const peerStreams = this.peers.get(id) - - if ( - peerStreams && - peerStreams.protocol === constants.FloodsubID && - this.score.score(id) >= this.opts.scoreThresholds.publishThreshold - ) { + this.floodsubPeers.forEach((id) => { + if (peersInTopic.has(id) && this.score.score(id) >= this.opts.scoreThresholds.publishThreshold) { tosend.add(id) tosendCount.floodsub++ } From 47d32d3a5a8e39fd0bd031be5061141028cfee7f Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 23 Mar 2022 09:22:36 +0530 Subject: [PATCH 2/2] Add comments --- ts/index.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ts/index.ts b/ts/index.ts index a9bdc157..185096fb 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -1627,7 +1627,9 @@ export default class Gossipsub extends EventEmitter { } }) - // as of Mar 2022, rust-libp2p does not have this while golang-libp2p and the spec do have + // As of Mar 2022, spec + golang-libp2p include this while rust-libp2p does not + // rust-libp2p: https://github.com/libp2p/rust-libp2p/blob/6cc3b4ec52c922bfcf562a29b5805c3150e37c75/protocols/gossipsub/src/behaviour.rs#L2693 + // spec: https://github.com/libp2p/specs/blob/10712c55ab309086a52eec7d25f294df4fa96528/pubsub/gossipsub/gossipsub-v1.0.md?plain=1#L361 this.floodsubPeers.forEach((peer) => { if ( peersInTopic.has(peer) && @@ -1694,6 +1696,7 @@ export default class Gossipsub extends EventEmitter { }) // floodsub peers + // Note: if there are no floodsub peers, we save a loop through peersInTopic Map this.floodsubPeers.forEach((id) => { if (peersInTopic.has(id) && this.score.score(id) >= this.opts.scoreThresholds.publishThreshold) { tosend.add(id)