Skip to content

Commit d5aba97

Browse files
committed
refactor: pubsub subsystem
1 parent d313299 commit d5aba97

12 files changed

+911
-147
lines changed

README.md

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -211,22 +211,18 @@ class Node extends Libp2p {
211211

212212
**IMPORTANT NOTE**: All the methods listed in the API section that take a callback are also now Promisified. Libp2p is migrating away from callbacks to async/await, and in a future release (that will be announced in advance), callback support will be removed entirely. You can follow progress of the async/await endeavor at https://github.com/ipfs/js-ipfs/issues/1670.
213213

214-
#### Create a Node - `Libp2p.createLibp2p(options, callback)`
214+
#### Create a Node - `Libp2p.create(options)`
215215

216216
> Behaves exactly like `new Libp2p(options)`, but doesn't require a PeerInfo. One will be generated instead
217217
218218
```js
219-
const { createLibp2p } = require('libp2p')
220-
createLibp2p(options, (err, libp2p) => {
221-
if (err) throw err
222-
libp2p.start((err) => {
223-
if (err) throw err
224-
})
225-
})
219+
const { create } = require('libp2p')
220+
const libp2p = await create(options)
221+
222+
await libp2p.start()
226223
```
227224

228225
- `options`: Object of libp2p configuration options
229-
- `callback`: Function with signature `function (Error, Libp2p) {}`
230226

231227
#### Create a Node alternative - `new Libp2p(options)`
232228

package.json

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@
6161
"multiaddr": "^7.1.0",
6262
"multistream-select": "^0.15.0",
6363
"once": "^1.4.0",
64+
"p-map": "^3.0.0",
6465
"p-queue": "^6.1.1",
6566
"p-settle": "^3.1.0",
66-
"peer-book": "^0.9.1",
6767
"peer-id": "^0.13.3",
6868
"peer-info": "^0.17.0",
6969
"promisify-es6": "^1.0.3",
@@ -92,8 +92,8 @@
9292
"libp2p-bootstrap": "^0.9.7",
9393
"libp2p-delegated-content-routing": "^0.2.2",
9494
"libp2p-delegated-peer-routing": "^0.2.2",
95-
"libp2p-floodsub": "~0.17.0",
96-
"libp2p-gossipsub": "~0.0.4",
95+
"libp2p-floodsub": "libp2p/js-libp2p-floodsub#refactor/async",
96+
"libp2p-gossipsub": "ChainSafe/gossipsub-js#refactor/async",
9797
"libp2p-kad-dht": "^0.15.3",
9898
"libp2p-mdns": "^0.12.3",
9999
"libp2p-mplex": "^0.9.1",
@@ -105,6 +105,7 @@
105105
"lodash.times": "^4.3.2",
106106
"nock": "^10.0.6",
107107
"p-defer": "^3.0.0",
108+
"p-wait-for": "^3.1.0",
108109
"portfinder": "^1.0.20",
109110
"pull-goodbye": "0.0.2",
110111
"pull-length-prefixed": "^1.3.3",

src/connection-manager/topology.js

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
'use strict'
2+
3+
class Topology {
4+
/**
5+
* @param {Object} props
6+
* @param {number} props.min minimum needed connections (default: 0)
7+
* @param {number} props.max maximum needed connections (default: Infinity)
8+
* @param {function} props.onConnect protocol "onConnect" handler
9+
* @param {function} props.onDisconnect protocol "onDisconnect" handler
10+
* @param {Array<string>} props.multicodecs protocol multicodecs
11+
* @param {Registrar} registrar
12+
* @constructor
13+
*/
14+
constructor ({
15+
min = 0,
16+
max = Infinity,
17+
onConnect,
18+
onDisconnect,
19+
multicodecs,
20+
registrar
21+
}) {
22+
this.multicodecs = multicodecs
23+
this.registrar = registrar
24+
this.min = min
25+
this.max = max
26+
this.peers = new Map()
27+
28+
// Handlers
29+
this._onConnect = onConnect
30+
this._onDisconnect = onDisconnect
31+
32+
this._onProtocolChange = this._onProtocolChange.bind(this)
33+
34+
// Set by the registrar
35+
this._peerStore = null
36+
}
37+
38+
/**
39+
* Set peerstore to the topology.
40+
* @param {PeerStore} peerStore
41+
*/
42+
set peerStore (peerStore) {
43+
this._peerStore = peerStore
44+
45+
this._peerStore.on('change:protocols', this._onProtocolChange)
46+
}
47+
48+
/**
49+
* Try to add a connected peer to the topology, if inside the thresholds.
50+
* @param {PeerInfo} peerInfo
51+
* @param {Connection} connection
52+
* @returns {void}
53+
*/
54+
tryToConnect (peerInfo, connection) {
55+
// TODO: conn manager should validate if it should try to connect
56+
57+
this._onConnect(peerInfo, connection)
58+
59+
this.peers.set(peerInfo.id.toB58String(), peerInfo)
60+
}
61+
62+
/**
63+
* Notify protocol of peer disconnected.
64+
* @param {PeerInfo} peerInfo
65+
* @param {Error} [error]
66+
* @returns {void}
67+
*/
68+
disconnect (peerInfo, error) {
69+
if (this.peers.delete(peerInfo.id.toB58String())) {
70+
this._onDisconnect(peerInfo, error)
71+
}
72+
}
73+
74+
/**
75+
* Check if a new peer support the multicodecs for this topology.
76+
* @param {Object} props
77+
* @param {PeerInfo} props.peerInfo
78+
* @param {Array<string>} props.protocols
79+
*/
80+
_onProtocolChange ({ peerInfo, protocols }) {
81+
const existingPeer = this.peers.get(peerInfo.id.toB58String())
82+
83+
protocols.filter(protocol => this.multicodecs.includes(protocol))
84+
85+
// Not supporting the protocol anymore?
86+
if (existingPeer && protocols.filter(protocol => this.multicodecs.includes(protocol)).length === 0) {
87+
this._onDisconnect({
88+
peerInfo
89+
})
90+
}
91+
92+
// New to protocol support
93+
for (const protocol of protocols) {
94+
if (this.multicodecs.includes(protocol)) {
95+
this.tryToConnect(peerInfo, this.registrar.getPeerConnection(peerInfo))
96+
return
97+
}
98+
}
99+
}
100+
}
101+
102+
module.exports = Topology

src/index.js

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
'use strict'
22

33
const FSM = require('fsm-event')
4-
const EventEmitter = require('events').EventEmitter
4+
const { EventEmitter } = require('events')
55
const debug = require('debug')
66
const log = debug('libp2p')
77
log.error = debug('libp2p:error')
88
const errCode = require('err-code')
99
const promisify = require('promisify-es6')
1010

1111
const each = require('async/each')
12-
const nextTick = require('async/nextTick')
1312

14-
const PeerBook = require('peer-book')
1513
const PeerInfo = require('peer-info')
1614
const multiaddr = require('multiaddr')
1715
const Switch = require('./switch')
@@ -29,6 +27,8 @@ const { codes } = require('./errors')
2927
const Dialer = require('./dialer')
3028
const TransportManager = require('./transport-manager')
3129
const Upgrader = require('./upgrader')
30+
const Registrar = require('./registrar')
31+
const PeerStore = require('./peer-store')
3232

3333
const notStarted = (action, state) => {
3434
return errCode(
@@ -54,25 +54,31 @@ class Libp2p extends EventEmitter {
5454

5555
this.datastore = this._options.datastore
5656
this.peerInfo = this._options.peerInfo
57-
this.peerBook = this._options.peerBook || new PeerBook()
5857

5958
this._modules = this._options.modules
6059
this._config = this._options.config
6160
this._transport = [] // Transport instances/references
6261
this._discovery = [] // Discovery service instances/references
6362

63+
this.peerStore = new PeerStore()
64+
6465
// create the switch, and listen for errors
65-
this._switch = new Switch(this.peerInfo, this.peerBook, this._options.switch)
66+
this._switch = new Switch(this.peerInfo, this.peerStore, this._options.switch)
6667

6768
// Setup the Upgrader
6869
this.upgrader = new Upgrader({
6970
localPeer: this.peerInfo.id,
7071
onConnection: (connection) => {
7172
const peerInfo = getPeerInfo(connection.remotePeer)
73+
74+
this.peerStore.put(peerInfo)
75+
this.registrar.onConnect(peerInfo, connection)
7276
this.emit('peer:connect', peerInfo)
7377
},
7478
onConnectionEnd: (connection) => {
7579
const peerInfo = getPeerInfo(connection.remotePeer)
80+
81+
this.registrar.onDisconnect(peerInfo)
7682
this.emit('peer:disconnect', peerInfo)
7783
}
7884
})
@@ -106,6 +112,10 @@ class Libp2p extends EventEmitter {
106112
transportManager: this.transportManager
107113
})
108114

115+
this.registrar = new Registrar(this.peerStore)
116+
this.handle = this.handle.bind(this)
117+
this.registrar.handle = this.handle
118+
109119
// Attach private network protector
110120
if (this._modules.connProtector) {
111121
this._switch.protector = this._modules.connProtector
@@ -124,7 +134,7 @@ class Libp2p extends EventEmitter {
124134
}
125135

126136
// start pubsub
127-
if (this._modules.pubsub && this._config.pubsub.enabled !== false) {
137+
if (this._modules.pubsub) {
128138
this.pubsub = pubsub(this, this._modules.pubsub, this._config.pubsub)
129139
}
130140

@@ -179,7 +189,7 @@ class Libp2p extends EventEmitter {
179189

180190
// Once we start, emit and dial any peers we may have already discovered
181191
this.state.on('STARTED', () => {
182-
this.peerBook.getAllArray().forEach((peerInfo) => {
192+
this.peerStore.getAllArray().forEach((peerInfo) => {
183193
this.emit('peer:discovery', peerInfo)
184194
this._maybeConnect(peerInfo)
185195
})
@@ -228,6 +238,7 @@ class Libp2p extends EventEmitter {
228238
this.state('stop')
229239

230240
try {
241+
this.pubsub && await this.pubsub.stop()
231242
await this.transportManager.close()
232243
await this._switch.stop()
233244
} catch (err) {
@@ -245,7 +256,7 @@ class Libp2p extends EventEmitter {
245256

246257
/**
247258
* Dials to the provided peer. If successful, the `PeerInfo` of the
248-
* peer will be added to the nodes `PeerBook`
259+
* peer will be added to the nodes `PeerStore`
249260
*
250261
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial
251262
* @param {object} options
@@ -258,7 +269,7 @@ class Libp2p extends EventEmitter {
258269

259270
/**
260271
* Dials to the provided peer and handshakes with the given protocol.
261-
* If successful, the `PeerInfo` of the peer will be added to the nodes `PeerBook`,
272+
* If successful, the `PeerInfo` of the peer will be added to the nodes `PeerStore`,
262273
* and the `Connection` will be sent in the callback
263274
*
264275
* @async
@@ -279,7 +290,13 @@ class Libp2p extends EventEmitter {
279290

280291
// If a protocol was provided, create a new stream
281292
if (protocols) {
282-
return connection.newStream(protocols)
293+
const stream = await connection.newStream(protocols)
294+
const peerInfo = getPeerInfo(connection.remotePeer)
295+
296+
peerInfo.protocols.add(stream.protocol)
297+
this.peerStore.put(peerInfo)
298+
299+
return stream
283300
}
284301

285302
return connection
@@ -350,10 +367,16 @@ class Libp2p extends EventEmitter {
350367
const multiaddrs = this.peerInfo.multiaddrs.toArray()
351368

352369
// Start parallel tasks
370+
const tasks = [
371+
this.transportManager.listen(multiaddrs)
372+
]
373+
374+
if (this._config.pubsub.enabled) {
375+
this.pubsub && this.pubsub.start()
376+
}
377+
353378
try {
354-
await Promise.all([
355-
this.transportManager.listen(multiaddrs)
356-
])
379+
await Promise.all(tasks)
357380
} catch (err) {
358381
log.error(err)
359382
this.emit('error', err)
@@ -369,12 +392,6 @@ class Libp2p extends EventEmitter {
369392
* the `peer:discovery` event. If auto dial is enabled for libp2p
370393
* and the current connection count is under the low watermark, the
371394
* peer will be dialed.
372-
*
373-
* TODO: If `peerBook.put` becomes centralized, https://github.com/libp2p/js-libp2p/issues/345,
374-
* it would be ideal if only new peers were emitted. Currently, with
375-
* other modules adding peers to the `PeerBook` we have no way of knowing
376-
* if a peer is new or not, so it has to be emitted.
377-
*
378395
* @private
379396
* @param {PeerInfo} peerInfo
380397
*/
@@ -383,7 +400,7 @@ class Libp2p extends EventEmitter {
383400
log.error(new Error(codes.ERR_DISCOVERED_SELF))
384401
return
385402
}
386-
peerInfo = this.peerBook.put(peerInfo)
403+
peerInfo = this.peerStore.put(peerInfo)
387404

388405
if (!this.isStarted()) return
389406

@@ -454,16 +471,15 @@ module.exports = Libp2p
454471
* Like `new Libp2p(options)` except it will create a `PeerInfo`
455472
* instance if one is not provided in options.
456473
* @param {object} options Libp2p configuration options
457-
* @param {function(Error, Libp2p)} callback
458-
* @returns {void}
474+
* @returns {Libp2p}
459475
*/
460-
module.exports.createLibp2p = promisify((options, callback) => {
476+
module.exports.create = async (options = {}) => {
461477
if (options.peerInfo) {
462-
return nextTick(callback, null, new Libp2p(options))
478+
return new Libp2p(options)
463479
}
464-
PeerInfo.create((err, peerInfo) => {
465-
if (err) return callback(err)
466-
options.peerInfo = peerInfo
467-
callback(null, new Libp2p(options))
468-
})
469-
})
480+
481+
const peerInfo = await PeerInfo.create()
482+
483+
options.peerInfo = peerInfo
484+
return new Libp2p(options)
485+
}

0 commit comments

Comments
 (0)