Skip to content

Commit 45c9b11

Browse files
richardschneiderdaviddias
authored andcommitted
fix: various floodsub issues (#51)
* fix: startup * Revert "fix: Published message field names (#49)" This reverts commit b8f66cd. * fix: close should always invoke callback * fix: avoid race conditions, by quietly ignoring unsub when shutdown * fix: only delete peer when connections match * fix: lint errors * fix: more work on connection shutdown * fix: RPC msg.from is now binary * fix: lint errors * fix: multiple connections to/from same peer This implements a refernce counting scheme. See libp2p/js-libp2p-floodsub#51 (comment) One test is still failing. * fix: topicCIDs => topicIDs * test: can not get this test to work! * fix: lint errors * fix: review changes * fix: lint errors on a comment, this going too far * test: get test working * test: add tests for publishing an array of messages * subscribe: polling for connectivity replaced by handling peer connection event * fix typo
1 parent dca9b1e commit 45c9b11

File tree

8 files changed

+197
-37
lines changed

8 files changed

+197
-37
lines changed

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
},
5555
"dependencies": {
5656
"async": "^2.6.0",
57+
"bs58": "^4.0.1",
5758
"debug": "^3.1.0",
5859
"length-prefixed-stream": "^1.5.1",
5960
"libp2p-crypto": "~0.10.3",

src/index.js

+68-26
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,53 @@ class FloodSub extends EventEmitter {
5959
this._dialPeer = this._dialPeer.bind(this)
6060
}
6161

62+
_addPeer (peer) {
63+
const id = peer.info.id.toB58String()
64+
65+
/*
66+
Always use an existing peer.
67+
68+
What is happening here is: "If the other peer has already dialed to me, we already have
69+
an establish link between the two, what might be missing is a
70+
Connection specifically between me and that Peer"
71+
*/
72+
let existing = this.peers.get(id)
73+
if (existing) {
74+
log('already existing peer', id)
75+
++existing._references
76+
} else {
77+
log('new peer', id)
78+
this.peers.set(id, peer)
79+
existing = peer
80+
}
81+
82+
return existing
83+
}
84+
85+
_removePeer (peer) {
86+
const id = peer.info.id.toB58String()
87+
88+
log('remove', id, peer._references)
89+
// Only delete when no one else is referencing this peer.
90+
if (--peer._references === 0) {
91+
log('delete peer', id)
92+
this.peers.delete(id)
93+
}
94+
95+
return peer
96+
}
97+
6298
_dialPeer (peerInfo, callback) {
6399
callback = callback || function noop () {}
64100
const idB58Str = peerInfo.id.toB58String()
65-
log('dialing %s', idB58Str)
66101

67102
// If already have a PubSub conn, ignore
68103
const peer = this.peers.get(idB58Str)
69104
if (peer && peer.isConnected) {
70105
return setImmediate(() => callback())
71106
}
72107

108+
log('dialing %s', idB58Str)
73109
this.libp2p.dial(peerInfo, multicodec, (err, conn) => {
74110
if (err) {
75111
log.err(err)
@@ -82,13 +118,9 @@ class FloodSub extends EventEmitter {
82118

83119
_onDial (peerInfo, conn, callback) {
84120
const idB58Str = peerInfo.id.toB58String()
121+
log('connected', idB58Str)
85122

86-
// If already had a dial to me, just add the conn
87-
if (!this.peers.has(idB58Str)) {
88-
this.peers.set(idB58Str, new Peer(peerInfo))
89-
}
90-
91-
const peer = this.peers.get(idB58Str)
123+
const peer = this._addPeer(new Peer(peerInfo))
92124
peer.attachConnection(conn)
93125

94126
// Immediately send my own subscriptions to the newly established conn
@@ -104,24 +136,20 @@ class FloodSub extends EventEmitter {
104136
}
105137

106138
const idB58Str = peerInfo.id.toB58String()
139+
const peer = this._addPeer(new Peer(peerInfo))
107140

108-
if (!this.peers.has(idB58Str)) {
109-
log('new peer', idB58Str)
110-
this.peers.set(idB58Str, new Peer(peerInfo))
111-
}
112-
113-
this._processConnection(idB58Str, conn)
141+
this._processConnection(idB58Str, conn, peer)
114142
})
115143
}
116144

117-
_processConnection (idB58Str, conn) {
145+
_processConnection (idB58Str, conn, peer) {
118146
pull(
119147
conn,
120148
lp.decode(),
121149
pull.map((data) => pb.rpc.RPC.decode(data)),
122150
pull.drain(
123151
(rpc) => this._onRpc(idB58Str, rpc),
124-
(err) => this._onConnectionEnd(idB58Str, err)
152+
(err) => this._onConnectionEnd(idB58Str, peer, err)
125153
)
126154
)
127155
}
@@ -131,11 +159,12 @@ class FloodSub extends EventEmitter {
131159
return
132160
}
133161

162+
log('rpc from', idB58Str)
134163
const subs = rpc.subscriptions
135164
const msgs = rpc.msgs
136165

137166
if (msgs && msgs.length) {
138-
this._processRpcMessages(rpc.msgs)
167+
this._processRpcMessages(utils.normalizeInRpcMessages(rpc.msgs))
139168
}
140169

141170
if (subs && subs.length) {
@@ -164,13 +193,14 @@ class FloodSub extends EventEmitter {
164193
})
165194
}
166195

167-
_onConnectionEnd (idB58Str, err) {
196+
_onConnectionEnd (idB58Str, peer, err) {
168197
// socket hang up, means the one side canceled
169198
if (err && err.message !== 'socket hang up') {
170199
log.err(err)
171200
}
172201

173-
this.peers.delete(idB58Str)
202+
log('connection ended', idB58Str, err ? err.message : '')
203+
this._removePeer(peer)
174204
}
175205

176206
_emitMessages (topics, messages) {
@@ -191,7 +221,7 @@ class FloodSub extends EventEmitter {
191221
return
192222
}
193223

194-
peer.sendMessages(messages)
224+
peer.sendMessages(utils.normalizeOutRpcMessages(messages))
195225

196226
log('publish msgs on topics', topics, peer.info.id.toB58String())
197227
})
@@ -241,11 +271,15 @@ class FloodSub extends EventEmitter {
241271
this.libp2p.unhandle(multicodec)
242272
this.libp2p.removeListener('peer:connect', this._dialPeer)
243273

274+
log('stopping')
244275
asyncEach(this.peers.values(), (peer, cb) => peer.close(cb), (err) => {
245276
if (err) {
246277
return callback(err)
247278
}
279+
280+
log('stopped')
248281
this.peers = new Map()
282+
this.subscriptions = new Set()
249283
this.started = false
250284
callback()
251285
})
@@ -287,7 +321,7 @@ class FloodSub extends EventEmitter {
287321
this._emitMessages(topics, msgObjects)
288322

289323
// send to all the other peers
290-
this._forwardMessages(topics, messages.map(buildMessage))
324+
this._forwardMessages(topics, msgObjects)
291325
}
292326

293327
/**
@@ -303,14 +337,18 @@ class FloodSub extends EventEmitter {
303337

304338
topics.forEach((topic) => this.subscriptions.add(topic))
305339

306-
this.peers.forEach((peer) => checkIfReady(peer))
340+
this.peers.forEach((peer) => sendSubscriptionsOnceReady(peer))
307341
// make sure that FloodSub is already mounted
308-
function checkIfReady (peer) {
342+
function sendSubscriptionsOnceReady (peer) {
309343
if (peer && peer.isWritable) {
310-
peer.sendSubscriptions(topics)
311-
} else {
312-
setImmediate(checkIfReady.bind(peer))
344+
return peer.sendSubscriptions(topics)
345+
}
346+
const onConnection = () => {
347+
peer.removeListener('connection', onConnection)
348+
sendSubscriptionsOnceReady(peer)
313349
}
350+
peer.on('connection', onConnection)
351+
peer.once('close', () => peer.removeListener('connection', onConnection))
314352
}
315353
}
316354

@@ -321,7 +359,11 @@ class FloodSub extends EventEmitter {
321359
* @returns {undefined}
322360
*/
323361
unsubscribe (topics) {
324-
assert(this.started, 'FloodSub is not started')
362+
// Avoid race conditions, by quietly ignoring unsub when shutdown.
363+
if (!this.started) {
364+
return
365+
}
366+
325367
topics = ensureArray(topics)
326368

327369
topics.forEach((topic) => this.subscriptions.delete(topic))

src/message/rpc.proto.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ message RPC {
1010
}
1111
1212
message Message {
13-
optional string from = 1;
13+
optional bytes from = 1;
1414
optional bytes data = 2;
1515
optional bytes seqno = 3;
16-
repeated string topicIDs = 4; // CID of topic descriptor object
16+
repeated string topicIDs = 4;
1717
}
1818
}`

src/peer.js

+20-6
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,20 @@ const lp = require('pull-length-prefixed')
44
const Pushable = require('pull-pushable')
55
const pull = require('pull-stream')
66
const setImmediate = require('async/setImmediate')
7+
const EventEmitter = require('events')
78

89
const rpc = require('./message').rpc.RPC
910

1011
/**
1112
* The known state of a connected peer.
1213
*/
13-
class Peer {
14+
class Peer extends EventEmitter {
1415
/**
1516
* @param {PeerInfo} info
1617
*/
1718
constructor (info) {
19+
super()
20+
1821
/**
1922
* @type {PeerInfo}
2023
*/
@@ -31,6 +34,8 @@ class Peer {
3134
* @type {Pushable}
3235
*/
3336
this.stream = null
37+
38+
this._references = 1
3439
}
3540

3641
/**
@@ -80,8 +85,15 @@ class Peer {
8085
pull(
8186
this.stream,
8287
lp.encode(),
83-
conn
88+
conn,
89+
pull.onEnd(() => {
90+
this.conn = null
91+
this.stream = null
92+
this.emit('close')
93+
})
8494
)
95+
96+
this.emit('connection')
8597
}
8698

8799
_sendRawSubscriptions (topics, subscribe) {
@@ -155,16 +167,18 @@ class Peer {
155167
* @returns {undefined}
156168
*/
157169
close (callback) {
158-
if (!this.conn || !this.stream) {
159-
// no connection to close
160-
}
161-
// end the pushable pull-stream
170+
// Force removal of peer
171+
this._references = 1
172+
173+
// End the pushable
162174
if (this.stream) {
163175
this.stream.end()
164176
}
177+
165178
setImmediate(() => {
166179
this.conn = null
167180
this.stream = null
181+
this.emit('close')
168182
callback()
169183
})
170184
}

src/utils.js

+27
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict'
22

33
const crypto = require('libp2p-crypto')
4+
const bs58 = require('bs58')
45

56
exports = module.exports
67

@@ -66,3 +67,29 @@ exports.ensureArray = (maybeArray) => {
6667

6768
return maybeArray
6869
}
70+
71+
exports.normalizeInRpcMessages = (messages) => {
72+
if (!messages) {
73+
return messages
74+
}
75+
return messages.map((msg) => {
76+
const m = Object.assign({}, msg)
77+
if (Buffer.isBuffer(msg.from)) {
78+
m.from = bs58.encode(msg.from)
79+
}
80+
return m
81+
})
82+
}
83+
84+
exports.normalizeOutRpcMessages = (messages) => {
85+
if (!messages) {
86+
return messages
87+
}
88+
return messages.map((msg) => {
89+
const m = Object.assign({}, msg)
90+
if (typeof msg.from === 'string' || msg.from instanceof String) {
91+
m.from = bs58.decode(msg.from)
92+
}
93+
return m
94+
})
95+
}

test/2-nodes.js

+27-3
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,30 @@ describe('basics between 2 nodes', () => {
142142
_times(10, () => fsB.publish('Z', new Buffer('banana')))
143143
})
144144

145+
it('Publish 10 msg to a topic:Z in nodeB as array', (done) => {
146+
let counter = 0
147+
148+
fsB.once('Z', shouldNotHappen)
149+
150+
fsA.on('Z', receivedMsg)
151+
152+
function receivedMsg (msg) {
153+
expect(msg.data.toString()).to.equal('banana')
154+
expect(msg.from).to.be.eql(fsB.libp2p.peerInfo.id.toB58String())
155+
expect(Buffer.isBuffer(msg.seqno)).to.be.true()
156+
expect(msg.topicIDs).to.be.eql(['Z'])
157+
158+
if (++counter === 10) {
159+
fsA.removeListener('Z', receivedMsg)
160+
done()
161+
}
162+
}
163+
164+
let msgs = []
165+
_times(10, () => msgs.push(new Buffer('banana')))
166+
fsB.publish('Z', msgs)
167+
})
168+
145169
it('Unsubscribe from topic:Z in nodeA', (done) => {
146170
fsA.unsubscribe('Z')
147171
expect(fsA.subscriptions.size).to.equal(0)
@@ -291,11 +315,11 @@ describe('basics between 2 nodes', () => {
291315
nodeA.dial(nodeB.peerInfo, (err) => {
292316
expect(err).to.not.exist()
293317
setTimeout(() => {
294-
expect(fsA.peers.size).to.equal(1)
295-
expect(fsB.peers.size).to.equal(1)
318+
expect(first(fsA.peers)._references).to.equal(2)
319+
expect(first(fsB.peers)._references).to.equal(2)
296320

297321
fsA.stop(() => setTimeout(() => {
298-
expect(fsB.peers.size).to.equal(0)
322+
expect(first(fsB.peers)._references).to.equal(1)
299323
done()
300324
}, 250))
301325
}, 1000)

0 commit comments

Comments
 (0)