Skip to content

Commit 2ca29fa

Browse files
twoethsdapplion
andauthored
Forward messages to floodsub peers (#214)
* Forward messages to floodsub peers * Add comments Co-authored-by: Lion - dapplion <[email protected]>
1 parent d469111 commit 2ca29fa

File tree

1 file changed

+32
-14
lines changed

1 file changed

+32
-14
lines changed

ts/index.ts

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,9 @@ export default class Gossipsub extends EventEmitter {
185185
/** Direct peers */
186186
private readonly direct = new Set<PeerIdStr>()
187187

188+
/** Floodsub peers */
189+
private readonly floodsubPeers = new Set<PeerIdStr>()
190+
188191
/** Cache of seen messages */
189192
private readonly seenCache: SimpleTimeCache<void>
190193

@@ -595,29 +598,33 @@ export default class Gossipsub extends EventEmitter {
595598
* Add a peer to the router
596599
*/
597600
private addPeer(peerId: PeerId, protocol: string, direction: ConnectionDirection): PeerStreams {
598-
let peerStreams = this.peers.get(peerId.toB58String())
601+
const peerIdStr = peerId.toB58String()
602+
let peerStreams = this.peers.get(peerIdStr)
599603

600604
// If peer streams already exists, do nothing
601605
if (peerStreams === undefined) {
602606
// else create a new peer streams
603-
this.log('new peer %s', peerId.toB58String())
607+
this.log('new peer %s', peerIdStr)
604608

605609
peerStreams = new PeerStreams({
606610
id: peerId,
607611
protocol
608612
})
609613

610-
this.peers.set(peerId.toB58String(), peerStreams)
614+
this.peers.set(peerIdStr, peerStreams)
611615
peerStreams.addListener('close', () => this.removePeer(peerId))
612616
}
613617

614618
// Add to peer scoring
615-
this.score.addPeer(peerId.toB58String())
619+
this.score.addPeer(peerIdStr)
620+
if (protocol === constants.FloodsubID) {
621+
this.floodsubPeers.add(peerIdStr)
622+
}
616623
this.metrics?.peersPerProtocol.inc({ protocol }, 1)
617624

618625
// track the connection direction. Don't allow to unset outbound
619-
if (!this.outbound.get(peerId.toB58String())) {
620-
this.outbound.set(peerId.toB58String(), direction === 'outbound')
626+
if (!this.outbound.get(peerIdStr)) {
627+
this.outbound.set(peerIdStr, direction === 'outbound')
621628
}
622629

623630
return peerStreams
@@ -660,6 +667,8 @@ export default class Gossipsub extends EventEmitter {
660667
peers.delete(id)
661668
}
662669

670+
// Remove from floodsubPeers
671+
this.floodsubPeers.delete(id)
663672
// Remove from gossip mapping
664673
this.gossip.delete(id)
665674
// Remove from control mapping
@@ -1617,6 +1626,20 @@ export default class Gossipsub extends EventEmitter {
16171626
tosend.add(peer)
16181627
}
16191628
})
1629+
1630+
// As of Mar 2022, spec + golang-libp2p include this while rust-libp2p does not
1631+
// rust-libp2p: https://github.com/libp2p/rust-libp2p/blob/6cc3b4ec52c922bfcf562a29b5805c3150e37c75/protocols/gossipsub/src/behaviour.rs#L2693
1632+
// spec: https://github.com/libp2p/specs/blob/10712c55ab309086a52eec7d25f294df4fa96528/pubsub/gossipsub/gossipsub-v1.0.md?plain=1#L361
1633+
this.floodsubPeers.forEach((peer) => {
1634+
if (
1635+
peersInTopic.has(peer) &&
1636+
propagationSource !== peer &&
1637+
!excludePeers?.has(peer) &&
1638+
this.score.score(peer) >= this.opts.scoreThresholds.publishThreshold
1639+
) {
1640+
tosend.add(peer)
1641+
}
1642+
})
16201643
}
16211644

16221645
// add mesh peers
@@ -1673,14 +1696,9 @@ export default class Gossipsub extends EventEmitter {
16731696
})
16741697

16751698
// floodsub peers
1676-
peersInTopic.forEach((id) => {
1677-
const peerStreams = this.peers.get(id)
1678-
1679-
if (
1680-
peerStreams &&
1681-
peerStreams.protocol === constants.FloodsubID &&
1682-
this.score.score(id) >= this.opts.scoreThresholds.publishThreshold
1683-
) {
1699+
// Note: if there are no floodsub peers, we save a loop through peersInTopic Map
1700+
this.floodsubPeers.forEach((id) => {
1701+
if (peersInTopic.has(id) && this.score.score(id) >= this.opts.scoreThresholds.publishThreshold) {
16841702
tosend.add(id)
16851703
tosendCount.floodsub++
16861704
}

0 commit comments

Comments
 (0)