Skip to content

Commit e559614

Browse files
committed
refactor: switch to async iterators
BREAKING CHANGE: Switch to using async/await and async iterators for all the API. Moreover, gossipsub does not need the libp2p instance anymore, receiving a registerar that enables it to receive the necessary events from libp2p
1 parent 660b054 commit e559614

24 files changed

+1631
-2046
lines changed

package.json

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,33 +32,31 @@
3232
"lint"
3333
],
3434
"dependencies": {
35-
"async": "^2.6.2",
35+
"debug": "^4.1.1",
3636
"err-code": "^2.0.0",
37-
"libp2p-floodsub": "~0.18.0",
38-
"libp2p-pubsub": "~0.2.1",
39-
"multistream-select": "~0.14.6",
40-
"peer-id": "~0.12.5",
41-
"peer-info": "~0.15.1",
37+
"it-length-prefixed": "^2.0.0",
38+
"it-pipe": "^1.0.1",
39+
"libp2p-floodsub": "libp2p/js-libp2p-floodsub#refactor/async",
40+
"libp2p-pubsub": "libp2p/js-libp2p-pubsub#refactor/async",
41+
"p-map": "^3.0.0",
42+
"peer-id": "~0.13.3",
43+
"peer-info": "~0.17.0",
4244
"protons": "^1.0.1",
43-
"pull-length-prefixed": "^1.3.3",
44-
"pull-stream": "^3.6.14"
45+
"time-cache": "^0.3.0"
4546
},
4647
"devDependencies": {
4748
"@types/chai": "^4.2.3",
4849
"@types/mocha": "^5.2.7",
49-
"aegir": "^20.3.1",
50+
"aegir": "^20.4.1",
5051
"benchmark": "^2.1.4",
5152
"chai": "^4.2.0",
5253
"chai-spies": "^1.0.0",
5354
"detect-node": "^2.0.4",
5455
"dirty-chai": "^2.0.1",
55-
"libp2p": "~0.26.2",
56-
"libp2p-secio": "~0.11.1",
57-
"libp2p-spdy": "~0.13.3",
58-
"libp2p-tcp": "~0.13.2",
59-
"libp2p-websockets": "~0.12.4",
56+
"it-pair": "^1.0.0",
6057
"lodash": "^4.17.15",
6158
"mocha": "^6.2.1",
59+
"p-times": "^2.1.0",
6260
"promisify-es6": "^1.0.3",
6361
"sinon": "^7.5.0"
6462
},

src/heartbeat.js

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ class Heartbeat {
1212
this.gossipsub = gossipsub
1313
}
1414

15-
start (callback) {
15+
start () {
1616
if (this._heartbeatTimer) {
1717
const errMsg = 'Heartbeat timer is already running'
1818
this.gossipsub.log(errMsg)
19-
return callback(errcode(new Error(errMsg), 'ERR_HEARTBEAT_ALREADY_RUNNING'))
19+
throw errcode(new Error(errMsg), 'ERR_HEARTBEAT_ALREADY_RUNNING')
2020
}
2121

2222
const heartbeatTimer = {
@@ -25,39 +25,35 @@ class Heartbeat {
2525
runPeriodically: (fn, period) => {
2626
heartbeatTimer._timeoutId = setInterval(fn, period)
2727
},
28-
cancel: (cb) => {
28+
cancel: () => {
2929
clearTimeout(heartbeatTimer._timeoutId)
30-
cb()
3130
}
3231
}
3332

3433
const heartbeat = this._heartbeat.bind(this)
34+
3535
setTimeout(() => {
3636
heartbeat()
3737
heartbeatTimer.runPeriodically(heartbeat, constants.GossipSubHeartbeatInterval)
3838
}, constants.GossipSubHeartbeatInitialDelay)
3939

4040
this._heartbeatTimer = heartbeatTimer
41-
callback()
4241
}
4342

4443
/**
4544
* Unmounts the gossipsub protocol and shuts down every connection
46-
*
4745
* @override
48-
* @param {Function} callback
4946
* @returns {void}
5047
*/
51-
stop (callback) {
48+
stop () {
5249
if (!this._heartbeatTimer) {
5350
const errMsg = 'Heartbeat timer is not running'
5451
this.gossipsub.log(errMsg)
55-
return callback(errcode(new Error(errMsg), 'ERR_HEARTBEAT_NO_RUNNING'))
52+
throw errcode(new Error(errMsg), 'ERR_HEARTBEAT_NO_RUNNING')
5653
}
57-
this._heartbeatTimer.cancel(() => {
58-
this._heartbeatTimer = null
59-
callback()
60-
})
54+
55+
this._heartbeatTimer.cancel()
56+
this._heartbeatTimer = null
6157
}
6258

6359
/**

src/index.js

Lines changed: 49 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
const assert = require('assert')
44
const { utils } = require('libp2p-pubsub')
55

6+
const PeerInfo = require('peer-info')
7+
68
const BasicPubsub = require('./pubsub')
79
const { MessageCache } = require('./messageCache')
810

@@ -12,15 +14,24 @@ const Heartbeat = require('./heartbeat')
1214

1315
class GossipSub extends BasicPubsub {
1416
/**
15-
* @param {Object} libp2p an instance of Libp2p
16-
* @param {Object} options
17-
* @param {bool} options.emitSelf if publish should emit to self, if subscribed, defaults to false
18-
* @param {bool} options.gossipIncoming if incoming messages on a subscribed topic should be automatically gossiped, defaults to true
19-
* @param {bool} options.fallbackToFloodsub if dial should fallback to floodsub, defaults to true
17+
* @param {PeerInfo} peerInfo instance of the peer's PeerInfo
18+
* @param {Object} registrar
19+
* @param {function} registrar.register
20+
* @param {function} registrar.unregister
21+
* @param {Object} [options]
22+
* @param {bool} [options.emitSelf] if publish should emit to self, if subscribed, defaults to false
23+
* @param {bool} [options.gossipIncoming] if incoming messages on a subscribed topic should be automatically gossiped, defaults to true
24+
* @param {bool} [options.fallbackToFloodsub] if dial should fallback to floodsub, defaults to true
2025
* @constructor
2126
*/
22-
constructor (libp2p, options) {
23-
super('libp2p:gossipsub', constants.GossipSubID, libp2p, options)
27+
constructor (peerInfo, registrar, options = {}) {
28+
assert(PeerInfo.isPeerInfo(peerInfo), 'peer info must be an instance of `peer-info`')
29+
30+
// registrar handling
31+
assert(registrar && typeof registrar.register === 'function', 'a register function must be provided in registrar')
32+
assert(registrar && typeof registrar.unregister === 'function', 'a unregister function must be provided in registrar')
33+
34+
super('libp2p:gossipsub', constants.GossipSubID, peerInfo, registrar, options)
2435

2536
/**
2637
* Map of topic meshes
@@ -71,37 +82,35 @@ class GossipSub extends BasicPubsub {
7182

7283
/**
7384
* Removes a peer from the router
74-
*
7585
* @override
7686
* @param {Peer} peer
7787
* @returns {PeerInfo}
7888
*/
7989
_removePeer (peer) {
8090
super._removePeer(peer)
81-
// Only delete when no one else if referencing this peer.
82-
if (peer._references === 0) {
83-
// Remove this peer from the mesh
84-
// eslint-disable-next-line no-unused-vars
85-
for (const [_, peers] of this.mesh.entries()) {
86-
peers.delete(peer)
87-
}
88-
// Remove this peer from the fanout
89-
// eslint-disable-next-line no-unused-vars
90-
for (const [_, peers] of this.fanout.entries()) {
91-
peers.delete(peer)
92-
}
9391

94-
// Remove from gossip mapping
95-
this.gossip.delete(peer)
96-
// Remove from control mapping
97-
this.control.delete(peer)
92+
// Remove this peer from the mesh
93+
// eslint-disable-next-line no-unused-vars
94+
for (const [_, peers] of this.mesh.entries()) {
95+
peers.delete(peer)
9896
}
97+
98+
// Remove this peer from the fanout
99+
// eslint-disable-next-line no-unused-vars
100+
for (const [_, peers] of this.fanout.entries()) {
101+
peers.delete(peer)
102+
}
103+
104+
// Remove from gossip mapping
105+
this.gossip.delete(peer)
106+
// Remove from control mapping
107+
this.control.delete(peer)
108+
99109
return peer
100110
}
101111

102112
/**
103113
* Handles an rpc control message from a peer
104-
*
105114
* @param {Peer} peer
106115
* @param {rpc.RPC} rpc
107116
* @returns {void}
@@ -139,6 +148,7 @@ class GossipSub extends BasicPubsub {
139148
if (!this._options.gossipIncoming) {
140149
return
141150
}
151+
142152
// Emit to floodsub peers
143153
this.peers.forEach((peer) => {
144154
if (peer.info.protocols.has(constants.FloodSubID) &&
@@ -168,10 +178,8 @@ class GossipSub extends BasicPubsub {
168178

169179
/**
170180
* Handles IHAVE messages
171-
*
172181
* @param {Peer} peer
173182
* @param {Array<rpc.RPC.ControlIHave>} ihave
174-
*
175183
* @returns {rpc.RPC.ControlIWant}
176184
*/
177185
_handleIHave (peer, ihave) {
@@ -204,10 +212,8 @@ class GossipSub extends BasicPubsub {
204212
/**
205213
* Handles IWANT messages
206214
* Returns messages to send back to peer
207-
*
208215
* @param {Peer} peer
209216
* @param {Array<rpc.RPC.ControlIWant>} iwant
210-
*
211217
* @returns {Array<rpc.RPC.Message>}
212218
*/
213219
_handleIWant (peer, iwant) {
@@ -234,12 +240,9 @@ class GossipSub extends BasicPubsub {
234240

235241
/**
236242
* Handles Graft messages
237-
*
238243
* @param {Peer} peer
239244
* @param {Array<rpc.RPC.ControlGraft>} graft
240-
*
241245
* @return {Array<rpc.RPC.ControlPrune>}
242-
*
243246
*/
244247
_handleGraft (peer, graft) {
245248
const prune = []
@@ -271,12 +274,9 @@ class GossipSub extends BasicPubsub {
271274

272275
/**
273276
* Handles Prune messages
274-
*
275277
* @param {Peer} peer
276278
* @param {Array<rpc.RPC.ControlPrune>} prune
277-
*
278279
* @returns {void}
279-
*
280280
*/
281281
_handlePrune (peer, prune) {
282282
prune.forEach(({ topicID }) => {
@@ -292,36 +292,28 @@ class GossipSub extends BasicPubsub {
292292
/**
293293
* Mounts the gossipsub protocol onto the libp2p node and sends our
294294
* our subscriptions to every peer connected
295-
*
296295
* @override
297-
* @param {Function} callback
298-
* @returns {void}
299-
*
296+
* @returns {Promise}
300297
*/
301-
start (callback) {
302-
super.start((err) => {
303-
if (err) return callback(err)
304-
this.heartbeat.start(callback)
305-
})
298+
async start () {
299+
await super.start()
300+
this.heartbeat.start()
306301
}
307302

308303
/**
309304
* Unmounts the gossipsub protocol and shuts down every connection
310-
*
311305
* @override
312-
* @param {Function} callback
313-
* @returns {void}
306+
* @returns {Promise}
314307
*/
315-
stop (callback) {
316-
super.stop((err) => {
317-
if (err) return callback(err)
318-
this.mesh = new Map()
319-
this.fanout = new Map()
320-
this.lastpub = new Map()
321-
this.gossip = new Map()
322-
this.control = new Map()
323-
this.heartbeat.stop(callback)
324-
})
308+
async stop () {
309+
await super.stop()
310+
this.heartbeat.stop()
311+
312+
this.mesh = new Map()
313+
this.fanout = new Map()
314+
this.lastpub = new Map()
315+
this.gossip = new Map()
316+
this.control = new Map()
325317
}
326318

327319
/**
@@ -355,7 +347,6 @@ class GossipSub extends BasicPubsub {
355347

356348
/**
357349
* Leave topics
358-
*
359350
* @param {Array<string>|string} topics
360351
* @returns {void}
361352
*/
@@ -431,7 +422,6 @@ class GossipSub extends BasicPubsub {
431422

432423
/**
433424
* Sends a GRAFT message to a peer
434-
*
435425
* @param {Peer} peer
436426
* @param {String} topic
437427
* @returns {void}
@@ -447,7 +437,6 @@ class GossipSub extends BasicPubsub {
447437

448438
/**
449439
* Sends a PRUNE message to a peer
450-
*
451440
* @param {Peer} peer
452441
* @param {String} topic
453442
* @returns {void}
@@ -505,7 +494,6 @@ class GossipSub extends BasicPubsub {
505494

506495
/**
507496
* Send graft and prune messages
508-
*
509497
* @param {Map<Peer, Array<String>>} tograft
510498
* @param {Map<Peer, Array<String>>} toprune
511499
*/
@@ -532,7 +520,6 @@ class GossipSub extends BasicPubsub {
532520

533521
/**
534522
* Emits gossip to peers in a particular topic
535-
*
536523
* @param {String} topic
537524
* @param {Set<Peer>} peers - peers to exclude
538525
* @returns {void}
@@ -575,7 +562,6 @@ class GossipSub extends BasicPubsub {
575562

576563
/**
577564
* Adds new IHAVE messages to pending gossip
578-
*
579565
* @param {Peer} peer
580566
* @param {Array<rpc.RPC.ControlIHave>} controlIHaveMsgs
581567
* @returns {void}
@@ -588,7 +574,6 @@ class GossipSub extends BasicPubsub {
588574

589575
/**
590576
* Returns the current time in milliseconds
591-
*
592577
* @returns {number}
593578
*/
594579
_now () {

0 commit comments

Comments
 (0)