Skip to content

Commit 3027835

Browse files
committed
refactor: switch to async iterators
BREAKING CHANGE: Switch to using async/await and async iterators for all the API. Moreover, gossipsub does not need the libp2p instance anymore, receiving a registerar that enables it to receive the necessary events from libp2p
1 parent 660b054 commit 3027835

24 files changed

+1646
-2048
lines changed

package.json

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,33 +32,31 @@
3232
"lint"
3333
],
3434
"dependencies": {
35-
"async": "^2.6.2",
35+
"debug": "^4.1.1",
3636
"err-code": "^2.0.0",
37-
"libp2p-floodsub": "~0.18.0",
38-
"libp2p-pubsub": "~0.2.1",
39-
"multistream-select": "~0.14.6",
40-
"peer-id": "~0.12.5",
41-
"peer-info": "~0.15.1",
37+
"it-length-prefixed": "^2.0.0",
38+
"it-pipe": "^1.0.1",
39+
"libp2p-floodsub": "libp2p/js-libp2p-floodsub#refactor/async",
40+
"libp2p-pubsub": "libp2p/js-libp2p-pubsub#refactor/async",
41+
"p-map": "^3.0.0",
42+
"peer-id": "~0.13.3",
43+
"peer-info": "~0.17.0",
4244
"protons": "^1.0.1",
43-
"pull-length-prefixed": "^1.3.3",
44-
"pull-stream": "^3.6.14"
45+
"time-cache": "^0.3.0"
4546
},
4647
"devDependencies": {
4748
"@types/chai": "^4.2.3",
4849
"@types/mocha": "^5.2.7",
49-
"aegir": "^20.3.1",
50+
"aegir": "^20.4.1",
5051
"benchmark": "^2.1.4",
5152
"chai": "^4.2.0",
5253
"chai-spies": "^1.0.0",
5354
"detect-node": "^2.0.4",
5455
"dirty-chai": "^2.0.1",
55-
"libp2p": "~0.26.2",
56-
"libp2p-secio": "~0.11.1",
57-
"libp2p-spdy": "~0.13.3",
58-
"libp2p-tcp": "~0.13.2",
59-
"libp2p-websockets": "~0.12.4",
56+
"it-pair": "^1.0.0",
6057
"lodash": "^4.17.15",
6158
"mocha": "^6.2.1",
59+
"p-times": "^2.1.0",
6260
"promisify-es6": "^1.0.3",
6361
"sinon": "^7.5.0"
6462
},

src/heartbeat.js

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ class Heartbeat {
1212
this.gossipsub = gossipsub
1313
}
1414

15-
start (callback) {
15+
start () {
1616
if (this._heartbeatTimer) {
1717
const errMsg = 'Heartbeat timer is already running'
1818
this.gossipsub.log(errMsg)
19-
return callback(errcode(new Error(errMsg), 'ERR_HEARTBEAT_ALREADY_RUNNING'))
19+
throw errcode(new Error(errMsg), 'ERR_HEARTBEAT_ALREADY_RUNNING')
2020
}
2121

2222
const heartbeatTimer = {
@@ -25,39 +25,35 @@ class Heartbeat {
2525
runPeriodically: (fn, period) => {
2626
heartbeatTimer._timeoutId = setInterval(fn, period)
2727
},
28-
cancel: (cb) => {
28+
cancel: () => {
2929
clearTimeout(heartbeatTimer._timeoutId)
30-
cb()
3130
}
3231
}
3332

3433
const heartbeat = this._heartbeat.bind(this)
34+
3535
setTimeout(() => {
3636
heartbeat()
3737
heartbeatTimer.runPeriodically(heartbeat, constants.GossipSubHeartbeatInterval)
3838
}, constants.GossipSubHeartbeatInitialDelay)
3939

4040
this._heartbeatTimer = heartbeatTimer
41-
callback()
4241
}
4342

4443
/**
4544
* Unmounts the gossipsub protocol and shuts down every connection
46-
*
4745
* @override
48-
* @param {Function} callback
4946
* @returns {void}
5047
*/
51-
stop (callback) {
48+
stop () {
5249
if (!this._heartbeatTimer) {
5350
const errMsg = 'Heartbeat timer is not running'
5451
this.gossipsub.log(errMsg)
55-
return callback(errcode(new Error(errMsg), 'ERR_HEARTBEAT_NO_RUNNING'))
52+
throw errcode(new Error(errMsg), 'ERR_HEARTBEAT_NO_RUNNING')
5653
}
57-
this._heartbeatTimer.cancel(() => {
58-
this._heartbeatTimer = null
59-
callback()
60-
})
54+
55+
this._heartbeatTimer.cancel()
56+
this._heartbeatTimer = null
6157
}
6258

6359
/**

src/index.js

Lines changed: 55 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
const assert = require('assert')
44
const { utils } = require('libp2p-pubsub')
55

6+
const PeerInfo = require('peer-info')
7+
68
const BasicPubsub = require('./pubsub')
79
const { MessageCache } = require('./messageCache')
810

@@ -12,15 +14,30 @@ const Heartbeat = require('./heartbeat')
1214

1315
class GossipSub extends BasicPubsub {
1416
/**
15-
* @param {Object} libp2p an instance of Libp2p
16-
* @param {Object} options
17-
* @param {bool} options.emitSelf if publish should emit to self, if subscribed, defaults to false
18-
* @param {bool} options.gossipIncoming if incoming messages on a subscribed topic should be automatically gossiped, defaults to true
19-
* @param {bool} options.fallbackToFloodsub if dial should fallback to floodsub, defaults to true
17+
* @param {PeerInfo} peerInfo instance of the peer's PeerInfo
18+
* @param {Object} registrar
19+
* @param {function} registrar.register
20+
* @param {function} registrar.unregister
21+
* @param {Object} [options]
22+
* @param {bool} [options.emitSelf] if publish should emit to self, if subscribed, defaults to false
23+
* @param {bool} [options.gossipIncoming] if incoming messages on a subscribed topic should be automatically gossiped, defaults to true
24+
* @param {bool} [options.fallbackToFloodsub] if dial should fallback to floodsub, defaults to true
2025
* @constructor
2126
*/
22-
constructor (libp2p, options) {
23-
super('libp2p:gossipsub', constants.GossipSubID, libp2p, options)
27+
constructor (peerInfo, registrar, options = {}) {
28+
assert(PeerInfo.isPeerInfo(peerInfo), 'peer info must be an instance of `peer-info`')
29+
30+
// registrar handling
31+
assert(registrar && typeof registrar.register === 'function', 'a register function must be provided in registrar')
32+
assert(registrar && typeof registrar.unregister === 'function', 'a unregister function must be provided in registrar')
33+
34+
super({
35+
debugName: 'libp2p:gossipsub',
36+
multicodec: constants.GossipSubID,
37+
peerInfo,
38+
registrar,
39+
options
40+
})
2441

2542
/**
2643
* Map of topic meshes
@@ -71,37 +88,35 @@ class GossipSub extends BasicPubsub {
7188

7289
/**
7390
* Removes a peer from the router
74-
*
7591
* @override
7692
* @param {Peer} peer
7793
* @returns {PeerInfo}
7894
*/
7995
_removePeer (peer) {
8096
super._removePeer(peer)
81-
// Only delete when no one else if referencing this peer.
82-
if (peer._references === 0) {
83-
// Remove this peer from the mesh
84-
// eslint-disable-next-line no-unused-vars
85-
for (const [_, peers] of this.mesh.entries()) {
86-
peers.delete(peer)
87-
}
88-
// Remove this peer from the fanout
89-
// eslint-disable-next-line no-unused-vars
90-
for (const [_, peers] of this.fanout.entries()) {
91-
peers.delete(peer)
92-
}
9397

94-
// Remove from gossip mapping
95-
this.gossip.delete(peer)
96-
// Remove from control mapping
97-
this.control.delete(peer)
98+
// Remove this peer from the mesh
99+
// eslint-disable-next-line no-unused-vars
100+
for (const [_, peers] of this.mesh.entries()) {
101+
peers.delete(peer)
102+
}
103+
104+
// Remove this peer from the fanout
105+
// eslint-disable-next-line no-unused-vars
106+
for (const [_, peers] of this.fanout.entries()) {
107+
peers.delete(peer)
98108
}
109+
110+
// Remove from gossip mapping
111+
this.gossip.delete(peer)
112+
// Remove from control mapping
113+
this.control.delete(peer)
114+
99115
return peer
100116
}
101117

102118
/**
103119
* Handles an rpc control message from a peer
104-
*
105120
* @param {Peer} peer
106121
* @param {rpc.RPC} rpc
107122
* @returns {void}
@@ -139,6 +154,7 @@ class GossipSub extends BasicPubsub {
139154
if (!this._options.gossipIncoming) {
140155
return
141156
}
157+
142158
// Emit to floodsub peers
143159
this.peers.forEach((peer) => {
144160
if (peer.info.protocols.has(constants.FloodSubID) &&
@@ -168,10 +184,8 @@ class GossipSub extends BasicPubsub {
168184

169185
/**
170186
* Handles IHAVE messages
171-
*
172187
* @param {Peer} peer
173188
* @param {Array<rpc.RPC.ControlIHave>} ihave
174-
*
175189
* @returns {rpc.RPC.ControlIWant}
176190
*/
177191
_handleIHave (peer, ihave) {
@@ -204,10 +218,8 @@ class GossipSub extends BasicPubsub {
204218
/**
205219
* Handles IWANT messages
206220
* Returns messages to send back to peer
207-
*
208221
* @param {Peer} peer
209222
* @param {Array<rpc.RPC.ControlIWant>} iwant
210-
*
211223
* @returns {Array<rpc.RPC.Message>}
212224
*/
213225
_handleIWant (peer, iwant) {
@@ -234,12 +246,9 @@ class GossipSub extends BasicPubsub {
234246

235247
/**
236248
* Handles Graft messages
237-
*
238249
* @param {Peer} peer
239250
* @param {Array<rpc.RPC.ControlGraft>} graft
240-
*
241251
* @return {Array<rpc.RPC.ControlPrune>}
242-
*
243252
*/
244253
_handleGraft (peer, graft) {
245254
const prune = []
@@ -271,12 +280,9 @@ class GossipSub extends BasicPubsub {
271280

272281
/**
273282
* Handles Prune messages
274-
*
275283
* @param {Peer} peer
276284
* @param {Array<rpc.RPC.ControlPrune>} prune
277-
*
278285
* @returns {void}
279-
*
280286
*/
281287
_handlePrune (peer, prune) {
282288
prune.forEach(({ topicID }) => {
@@ -292,36 +298,28 @@ class GossipSub extends BasicPubsub {
292298
/**
293299
* Mounts the gossipsub protocol onto the libp2p node and sends our
294300
* our subscriptions to every peer connected
295-
*
296301
* @override
297-
* @param {Function} callback
298-
* @returns {void}
299-
*
302+
* @returns {Promise}
300303
*/
301-
start (callback) {
302-
super.start((err) => {
303-
if (err) return callback(err)
304-
this.heartbeat.start(callback)
305-
})
304+
async start () {
305+
await super.start()
306+
this.heartbeat.start()
306307
}
307308

308309
/**
309310
* Unmounts the gossipsub protocol and shuts down every connection
310-
*
311311
* @override
312-
* @param {Function} callback
313-
* @returns {void}
312+
* @returns {Promise}
314313
*/
315-
stop (callback) {
316-
super.stop((err) => {
317-
if (err) return callback(err)
318-
this.mesh = new Map()
319-
this.fanout = new Map()
320-
this.lastpub = new Map()
321-
this.gossip = new Map()
322-
this.control = new Map()
323-
this.heartbeat.stop(callback)
324-
})
314+
async stop () {
315+
await super.stop()
316+
this.heartbeat.stop()
317+
318+
this.mesh = new Map()
319+
this.fanout = new Map()
320+
this.lastpub = new Map()
321+
this.gossip = new Map()
322+
this.control = new Map()
325323
}
326324

327325
/**
@@ -355,7 +353,6 @@ class GossipSub extends BasicPubsub {
355353

356354
/**
357355
* Leave topics
358-
*
359356
* @param {Array<string>|string} topics
360357
* @returns {void}
361358
*/
@@ -431,7 +428,6 @@ class GossipSub extends BasicPubsub {
431428

432429
/**
433430
* Sends a GRAFT message to a peer
434-
*
435431
* @param {Peer} peer
436432
* @param {String} topic
437433
* @returns {void}
@@ -447,7 +443,6 @@ class GossipSub extends BasicPubsub {
447443

448444
/**
449445
* Sends a PRUNE message to a peer
450-
*
451446
* @param {Peer} peer
452447
* @param {String} topic
453448
* @returns {void}
@@ -505,7 +500,6 @@ class GossipSub extends BasicPubsub {
505500

506501
/**
507502
* Send graft and prune messages
508-
*
509503
* @param {Map<Peer, Array<String>>} tograft
510504
* @param {Map<Peer, Array<String>>} toprune
511505
*/
@@ -532,7 +526,6 @@ class GossipSub extends BasicPubsub {
532526

533527
/**
534528
* Emits gossip to peers in a particular topic
535-
*
536529
* @param {String} topic
537530
* @param {Set<Peer>} peers - peers to exclude
538531
* @returns {void}
@@ -575,7 +568,6 @@ class GossipSub extends BasicPubsub {
575568

576569
/**
577570
* Adds new IHAVE messages to pending gossip
578-
*
579571
* @param {Peer} peer
580572
* @param {Array<rpc.RPC.ControlIHave>} controlIHaveMsgs
581573
* @returns {void}
@@ -588,7 +580,6 @@ class GossipSub extends BasicPubsub {
588580

589581
/**
590582
* Returns the current time in milliseconds
591-
*
592583
* @returns {number}
593584
*/
594585
_now () {

0 commit comments

Comments
 (0)