Skip to content

Commit 65801dc

Browse files
authored
Merge pull request #15 from libp2p/feat/start-and-stop
feat/start and stop
2 parents 181f1fc + 61ceb8c commit 65801dc

File tree

5 files changed

+203
-91
lines changed

5 files changed

+203
-91
lines changed

README.md

+10-5
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,17 @@ const FloodSub = require('libp2p-floodsub')
3535

3636
const fsub = new FloodSub(libp2pNodeInstance)
3737

38-
fsub.on('fruit', (data) => {
39-
console.log(data)
38+
fsub.start((err) => {
39+
if (err) {
40+
console.log('Upsy', err)
41+
}
42+
fsub.on('fruit', (data) => {
43+
console.log(data)
44+
})
45+
fsub.subscribe('fruit')
46+
47+
fsub.publish('fruit', new Buffer('banana'))
4048
})
41-
fsub.subscribe('fruit')
42-
43-
fsub.publish('fruit', new Buffer('banana'))
4449
```
4550

4651
## API

src/index.js

+69-16
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ const TimeCache = require('time-cache')
55
const values = require('lodash.values')
66
const pull = require('pull-stream')
77
const lp = require('pull-length-prefixed')
8+
const assert = require('assert')
9+
const asyncEach = require('async/each')
810

911
const Peer = require('./peer')
1012
const utils = require('./utils')
@@ -16,20 +18,20 @@ const multicodec = config.multicodec
1618
const ensureArray = utils.ensureArray
1719

1820
/**
19-
* PubSubGossip, also known as pubsub-flood or just dumbsub,
20-
* this implementation of pubsub focused on delivering an API
21-
* for Publish/Subscribe, but with no CastTree Forming
21+
* FloodSub (aka dumbsub is an implementation of pubsub focused on
22+
* delivering an API for Publish/Subscribe, but with no CastTree Forming
2223
* (it just floods the network).
2324
*/
2425
class FloodSub extends EventEmitter {
2526
/**
2627
* @param {Object} libp2p
27-
* @returns {PubSubGossip}
28+
* @returns {FloodSub}
2829
*/
2930
constructor (libp2p) {
3031
super()
3132

3233
this.libp2p = libp2p
34+
this.started = false
3335

3436
/**
3537
* Time based cache for sequence numbers.
@@ -51,18 +53,8 @@ class FloodSub extends EventEmitter {
5153
*/
5254
this.subscriptions = new Set()
5355

54-
const onConnection = this._onConnection.bind(this)
55-
this.libp2p.handle(multicodec, onConnection)
56-
57-
// Speed up any new peer that comes in my way
58-
this.libp2p.swarm.on('peer-mux-established', (p) => {
59-
this._dialPeer(p)
60-
})
61-
62-
// Dial already connected peers
63-
values(this.libp2p.peerBook.getAll()).forEach((p) => {
64-
this._dialPeer(p)
65-
})
56+
this._onConnection = this._onConnection.bind(this)
57+
this._dialPeer = this._dialPeer.bind(this)
6658
}
6759

6860
_dialPeer (peerInfo) {
@@ -199,6 +191,62 @@ class FloodSub extends EventEmitter {
199191
})
200192
}
201193

194+
/**
195+
* Mounts the floodsub protocol onto the libp2p node and sends our
196+
* subscriptions to every peer conneceted
197+
*
198+
* @param {Function} callback
199+
* @returns {undefined}
200+
*
201+
*/
202+
start (callback) {
203+
if (this.started) {
204+
return setImmediate(() => callback(new Error('already started')))
205+
}
206+
207+
this.libp2p.handle(multicodec, this._onConnection)
208+
209+
// Speed up any new peer that comes in my way
210+
this.libp2p.swarm.on('peer-mux-established', this._dialPeer)
211+
212+
// Dial already connected peers
213+
const peerInfos = values(this.libp2p.peerBook.getAll())
214+
215+
peerInfos.forEach((peerInfo) => {
216+
this._dialPeer(peerInfo)
217+
})
218+
219+
setImmediate(() => {
220+
this.started = true
221+
callback()
222+
})
223+
}
224+
225+
/**
226+
* Unmounts the floodsub protocol and shuts down every connection
227+
*
228+
* @param {Function} callback
229+
* @returns {undefined}
230+
*
231+
*/
232+
stop (callback) {
233+
if (!this.started) {
234+
return setImmediate(() => callback(new Error('not started yet')))
235+
}
236+
237+
this.libp2p.unhandle(multicodec)
238+
this.libp2p.swarm.removeListener('peer-mux-established', this._dialPeer)
239+
240+
asyncEach(this.peers.values(), (peer, cb) => peer.close(cb), (err) => {
241+
if (err) {
242+
return callback(err)
243+
}
244+
this.peers = new Map()
245+
this.started = false
246+
callback()
247+
})
248+
}
249+
202250
/**
203251
* Publish messages to the given topics.
204252
*
@@ -208,6 +256,8 @@ class FloodSub extends EventEmitter {
208256
*
209257
*/
210258
publish (topics, messages) {
259+
assert(this.started, 'FloodSub is not started')
260+
211261
log('publish', topics, messages)
212262

213263
topics = ensureArray(topics)
@@ -243,6 +293,8 @@ class FloodSub extends EventEmitter {
243293
* @returns {undefined}
244294
*/
245295
subscribe (topics) {
296+
assert(this.started, 'FloodSub is not started')
297+
246298
topics = ensureArray(topics)
247299

248300
topics.forEach((topic) => {
@@ -261,6 +313,7 @@ class FloodSub extends EventEmitter {
261313
* @returns {undefined}
262314
*/
263315
unsubscribe (topics) {
316+
assert(this.started, 'FloodSub is not started')
264317
topics = ensureArray(topics)
265318

266319
topics.forEach((topic) => {

src/peer.js

+19
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,25 @@ class Peer {
146146
}
147147
})
148148
}
149+
150+
/**
151+
* Closes the open connection to peer
152+
*
153+
* @param {Function} callback
154+
* @returns {undefined}
155+
*/
156+
close (callback) {
157+
if (!this.conn || !this.stream) {
158+
// no connection to close
159+
}
160+
// end the pushable pull-stream
161+
this.stream.end()
162+
setImmediate(() => {
163+
this.conn = null
164+
this.stream = null
165+
callback()
166+
})
167+
}
149168
}
150169

151170
module.exports = Peer

0 commit comments

Comments
 (0)