1
1
'use strict'
2
2
3
3
const FSM = require ( 'fsm-event' )
4
- const EventEmitter = require ( 'events' ) . EventEmitter
4
+ const { EventEmitter } = require ( 'events' )
5
5
const debug = require ( 'debug' )
6
6
const log = debug ( 'libp2p' )
7
7
log . error = debug ( 'libp2p:error' )
8
8
const errCode = require ( 'err-code' )
9
9
const promisify = require ( 'promisify-es6' )
10
10
11
11
const each = require ( 'async/each' )
12
- const nextTick = require ( 'async/nextTick' )
13
12
14
- const PeerBook = require ( 'peer-book' )
15
13
const PeerInfo = require ( 'peer-info' )
16
14
const multiaddr = require ( 'multiaddr' )
17
15
const Switch = require ( './switch' )
@@ -29,6 +27,8 @@ const { codes } = require('./errors')
29
27
const Dialer = require ( './dialer' )
30
28
const TransportManager = require ( './transport-manager' )
31
29
const Upgrader = require ( './upgrader' )
30
+ const Registrar = require ( './registrar' )
31
+ const PeerStore = require ( './peer-store' )
32
32
33
33
const notStarted = ( action , state ) => {
34
34
return errCode (
@@ -54,25 +54,31 @@ class Libp2p extends EventEmitter {
54
54
55
55
this . datastore = this . _options . datastore
56
56
this . peerInfo = this . _options . peerInfo
57
- this . peerBook = this . _options . peerBook || new PeerBook ( )
58
57
59
58
this . _modules = this . _options . modules
60
59
this . _config = this . _options . config
61
60
this . _transport = [ ] // Transport instances/references
62
61
this . _discovery = [ ] // Discovery service instances/references
63
62
63
+ this . peerStore = new PeerStore ( )
64
+
64
65
// create the switch, and listen for errors
65
- this . _switch = new Switch ( this . peerInfo , this . peerBook , this . _options . switch )
66
+ this . _switch = new Switch ( this . peerInfo , this . peerStore , this . _options . switch )
66
67
67
68
// Setup the Upgrader
68
69
this . upgrader = new Upgrader ( {
69
70
localPeer : this . peerInfo . id ,
70
71
onConnection : ( connection ) => {
71
72
const peerInfo = getPeerInfo ( connection . remotePeer )
73
+
74
+ this . peerStore . put ( peerInfo )
75
+ this . registrar . onConnect ( peerInfo , connection )
72
76
this . emit ( 'peer:connect' , peerInfo )
73
77
} ,
74
78
onConnectionEnd : ( connection ) => {
75
79
const peerInfo = getPeerInfo ( connection . remotePeer )
80
+
81
+ this . registrar . onDisconnect ( peerInfo )
76
82
this . emit ( 'peer:disconnect' , peerInfo )
77
83
}
78
84
} )
@@ -106,6 +112,10 @@ class Libp2p extends EventEmitter {
106
112
transportManager : this . transportManager
107
113
} )
108
114
115
+ this . registrar = new Registrar ( this . peerStore )
116
+ this . handle = this . handle . bind ( this )
117
+ this . registrar . handle = this . handle
118
+
109
119
// Attach private network protector
110
120
if ( this . _modules . connProtector ) {
111
121
this . _switch . protector = this . _modules . connProtector
@@ -124,8 +134,10 @@ class Libp2p extends EventEmitter {
124
134
}
125
135
126
136
// start pubsub
127
- if ( this . _modules . pubsub && this . _config . pubsub . enabled !== false ) {
137
+ if ( this . _modules . pubsub ) {
128
138
this . pubsub = pubsub ( this , this . _modules . pubsub , this . _config . pubsub )
139
+ // const Pubsub = this._modules.pubsub
140
+ // this.pubsub = new Pubsub(this.peerInfo, this.registrar, this._config.pubsub)
129
141
}
130
142
131
143
// Attach remaining APIs
@@ -179,7 +191,7 @@ class Libp2p extends EventEmitter {
179
191
180
192
// Once we start, emit and dial any peers we may have already discovered
181
193
this . state . on ( 'STARTED' , ( ) => {
182
- this . peerBook . getAllArray ( ) . forEach ( ( peerInfo ) => {
194
+ this . peerStore . getAllArray ( ) . forEach ( ( peerInfo ) => {
183
195
this . emit ( 'peer:discovery' , peerInfo )
184
196
this . _maybeConnect ( peerInfo )
185
197
} )
@@ -228,6 +240,7 @@ class Libp2p extends EventEmitter {
228
240
this . state ( 'stop' )
229
241
230
242
try {
243
+ this . pubsub && await this . pubsub . stop ( )
231
244
await this . transportManager . close ( )
232
245
await this . _switch . stop ( )
233
246
} catch ( err ) {
@@ -245,7 +258,7 @@ class Libp2p extends EventEmitter {
245
258
246
259
/**
247
260
* Dials to the provided peer. If successful, the `PeerInfo` of the
248
- * peer will be added to the nodes `PeerBook `
261
+ * peer will be added to the nodes `PeerStore `
249
262
*
250
263
* @param {PeerInfo|PeerId|Multiaddr|string } peer The peer to dial
251
264
* @param {object } options
@@ -258,7 +271,7 @@ class Libp2p extends EventEmitter {
258
271
259
272
/**
260
273
* Dials to the provided peer and handshakes with the given protocol.
261
- * If successful, the `PeerInfo` of the peer will be added to the nodes `PeerBook `,
274
+ * If successful, the `PeerInfo` of the peer will be added to the nodes `PeerStore `,
262
275
* and the `Connection` will be sent in the callback
263
276
*
264
277
* @async
@@ -279,7 +292,13 @@ class Libp2p extends EventEmitter {
279
292
280
293
// If a protocol was provided, create a new stream
281
294
if ( protocols ) {
282
- return connection . newStream ( protocols )
295
+ const stream = await connection . newStream ( protocols )
296
+ const peerInfo = getPeerInfo ( connection . remotePeer )
297
+
298
+ peerInfo . protocols . add ( stream . protocol )
299
+ this . peerStore . put ( peerInfo )
300
+
301
+ return stream
283
302
}
284
303
285
304
return connection
@@ -350,10 +369,16 @@ class Libp2p extends EventEmitter {
350
369
const multiaddrs = this . peerInfo . multiaddrs . toArray ( )
351
370
352
371
// Start parallel tasks
372
+ const tasks = [
373
+ this . transportManager . listen ( multiaddrs )
374
+ ]
375
+
376
+ if ( this . _config . pubsub . enabled ) {
377
+ this . pubsub && this . pubsub . start ( )
378
+ }
379
+
353
380
try {
354
- await Promise . all ( [
355
- this . transportManager . listen ( multiaddrs )
356
- ] )
381
+ await Promise . all ( tasks )
357
382
} catch ( err ) {
358
383
log . error ( err )
359
384
this . emit ( 'error' , err )
@@ -369,12 +394,6 @@ class Libp2p extends EventEmitter {
369
394
* the `peer:discovery` event. If auto dial is enabled for libp2p
370
395
* and the current connection count is under the low watermark, the
371
396
* peer will be dialed.
372
- *
373
- * TODO: If `peerBook.put` becomes centralized, https://github.com/libp2p/js-libp2p/issues/345,
374
- * it would be ideal if only new peers were emitted. Currently, with
375
- * other modules adding peers to the `PeerBook` we have no way of knowing
376
- * if a peer is new or not, so it has to be emitted.
377
- *
378
397
* @private
379
398
* @param {PeerInfo } peerInfo
380
399
*/
@@ -383,7 +402,7 @@ class Libp2p extends EventEmitter {
383
402
log . error ( new Error ( codes . ERR_DISCOVERED_SELF ) )
384
403
return
385
404
}
386
- peerInfo = this . peerBook . put ( peerInfo )
405
+ peerInfo = this . peerStore . put ( peerInfo )
387
406
388
407
if ( ! this . isStarted ( ) ) return
389
408
@@ -454,16 +473,15 @@ module.exports = Libp2p
454
473
* Like `new Libp2p(options)` except it will create a `PeerInfo`
455
474
* instance if one is not provided in options.
456
475
* @param {object } options Libp2p configuration options
457
- * @param {function(Error, Libp2p) } callback
458
- * @returns {void }
476
+ * @returns {Libp2p }
459
477
*/
460
- module . exports . createLibp2p = promisify ( ( options , callback ) => {
478
+ module . exports . create = async ( options = { } ) => {
461
479
if ( options . peerInfo ) {
462
- return nextTick ( callback , null , new Libp2p ( options ) )
480
+ return new Libp2p ( options )
463
481
}
464
- PeerInfo . create ( ( err , peerInfo ) => {
465
- if ( err ) return callback ( err )
466
- options . peerInfo = peerInfo
467
- callback ( null , new Libp2p ( options ) )
468
- } )
469
- } )
482
+
483
+ const peerInfo = await PeerInfo . create ( )
484
+
485
+ options . peerInfo = peerInfo
486
+ return new Libp2p ( options )
487
+ }
0 commit comments