3
3
const EventEmitter = require ( 'events' ) . EventEmitter
4
4
const assert = require ( 'assert' )
5
5
6
- const setImmediate = require ( 'async/setImmediate' )
7
6
const each = require ( 'async/each' )
8
7
const series = require ( 'async/series' )
8
+ const parallel = require ( 'async/parallel' )
9
9
10
10
const PeerBook = require ( 'peer-book' )
11
11
const Switch = require ( 'libp2p-switch' )
@@ -18,6 +18,7 @@ const contentRouting = require('./content-routing')
18
18
const dht = require ( './dht' )
19
19
const pubsub = require ( './pubsub' )
20
20
const getPeerInfo = require ( './get-peer-info' )
21
+ const validateConfig = require ( './config' ) . validate
21
22
22
23
exports = module . exports
23
24
@@ -26,15 +27,15 @@ const NOT_STARTED_ERROR_MESSAGE = 'The libp2p node is not started yet'
26
27
class Node extends EventEmitter {
27
28
constructor ( _options ) {
28
29
super ( )
29
- assert ( _options . modules , 'requires modules to equip libp2p with features' )
30
- assert ( _options . peerInfo , 'requires a PeerInfo instance' )
30
+ // validateConfig will ensure the config is correct,
31
+ // and add default values where appropriate
32
+ _options = validateConfig ( _options )
31
33
32
34
this . peerInfo = _options . peerInfo
33
35
this . peerBook = _options . peerBook || new PeerBook ( )
34
36
35
37
this . _modules = _options . modules
36
- // TODO populate with default config, if any
37
- this . _config = _options . config || { }
38
+ this . _config = _options . config
38
39
this . _isStarted = false
39
40
this . _transport = [ ] // Transport instances/references
40
41
this . _discovery = [ ] // Discovery service instances/references
@@ -46,7 +47,6 @@ class Node extends EventEmitter {
46
47
// Attach stream multiplexers
47
48
if ( this . _modules . streamMuxer ) {
48
49
let muxers = this . _modules . streamMuxer
49
- muxers = Array . isArray ( muxers ) ? muxers : [ muxers ]
50
50
muxers . forEach ( ( muxer ) => this . _switch . connection . addStreamMuxer ( muxer ) )
51
51
52
52
// If muxer exists
@@ -70,16 +70,13 @@ class Node extends EventEmitter {
70
70
// Attach crypto channels
71
71
if ( this . _modules . connEncryption ) {
72
72
let cryptos = this . _modules . connEncryption
73
- cryptos = Array . isArray ( cryptos ) ? cryptos : [ cryptos ]
74
73
cryptos . forEach ( ( crypto ) => {
75
74
this . _switch . connection . crypto ( crypto . tag , crypto . encrypt )
76
75
} )
77
76
}
78
77
79
78
// dht provided components (peerRouting, contentRouting, dht)
80
- if ( this . _config . EXPERIMENTAL &&
81
- this . _config . EXPERIMENTAL . dht &&
82
- this . _modules . dht ) {
79
+ if ( this . _config . EXPERIMENTAL . dht ) {
83
80
const DHT = this . _modules . dht
84
81
this . _dht = new DHT ( this . _switch , {
85
82
kBucketSize : this . _config . dht . kBucketSize || 20 ,
@@ -91,14 +88,13 @@ class Node extends EventEmitter {
91
88
92
89
// enable/disable pubsub
93
90
if ( this . _config . EXPERIMENTAL && this . _config . EXPERIMENTAL . pubsub ) {
94
- // TODO only enable PubSub if flag is set to true
91
+ this . pubsub = pubsub ( this )
95
92
}
96
93
97
94
// Attach remaining APIs
98
95
this . peerRouting = peerRouting ( this )
99
96
this . contentRouting = contentRouting ( this )
100
97
this . dht = dht ( this )
101
- this . pubsub = pubsub ( this )
102
98
103
99
this . _getPeerInfo = getPeerInfo ( this )
104
100
@@ -195,7 +191,7 @@ class Node extends EventEmitter {
195
191
( cb ) => {
196
192
// TODO: chicken-and-egg problem #2:
197
193
// have to set started here because FloodSub requires libp2p is already started
198
- if ( this . _options !== false ) {
194
+ if ( this . _floodSub ) {
199
195
this . _floodSub . start ( cb )
200
196
} else {
201
197
cb ( )
@@ -225,17 +221,24 @@ class Node extends EventEmitter {
225
221
* Stop the libp2p node by closing its listeners and open connections
226
222
*/
227
223
stop ( callback ) {
228
- if ( this . _modules . peerDiscovery ) {
229
- this . _discovery . forEach ( ( d ) => {
230
- setImmediate ( ( ) => d . stop ( ( ) => { } ) )
231
- } )
232
- }
233
-
234
224
series ( [
235
225
( cb ) => {
236
- if ( this . _floodSub . started ) {
237
- this . _floodSub . stop ( cb )
226
+ if ( this . _modules . peerDiscovery ) {
227
+ // stop all discoveries before continuing with shutdown
228
+ return parallel (
229
+ this . _discovery . map ( ( d ) => {
230
+ return ( _cb ) => d . stop ( ( ) => { _cb ( ) } )
231
+ } ) ,
232
+ cb
233
+ )
234
+ }
235
+ cb ( )
236
+ } ,
237
+ ( cb ) => {
238
+ if ( this . _floodSub ) {
239
+ return this . _floodSub . stop ( cb )
238
240
}
241
+ cb ( )
239
242
} ,
240
243
( cb ) => {
241
244
if ( this . _dht ) {
@@ -303,7 +306,9 @@ class Node extends EventEmitter {
303
306
}
304
307
305
308
ping ( peer , callback ) {
306
- assert ( this . isStarted ( ) , NOT_STARTED_ERROR_MESSAGE )
309
+ if ( ! this . isStarted ( ) ) {
310
+ return callback ( new Error ( NOT_STARTED_ERROR_MESSAGE ) )
311
+ }
307
312
308
313
this . _getPeerInfo ( peer , ( err , peerInfo ) => {
309
314
if ( err ) { return callback ( err ) }
0 commit comments