Skip to content

Commit 3303ad0

Browse files
dirkmcvasco-santos
authored andcommitted
fix: prevent double dialing same peer (#63)
1 parent a767cee commit 3303ad0

File tree

3 files changed

+145
-0
lines changed

3 files changed

+145
-0
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
"aegir": "^17.1.1",
4242
"benchmark": "^2.1.4",
4343
"chai": "^4.2.0",
44+
"chai-spies": "^1.0.0",
4445
"dirty-chai": "^2.0.1",
4546
"libp2p": "~0.24.1",
4647
"libp2p-secio": "~0.10.1",

src/base.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ class BaseProtocol extends EventEmitter {
3838
*/
3939
this.peers = new Map()
4040

41+
// Dials that are currently in progress
42+
this._dials = new Set()
43+
4144
this._onConnection = this._onConnection.bind(this)
4245
this._dialPeer = this._dialPeer.bind(this)
4346
}
@@ -88,13 +91,29 @@ class BaseProtocol extends EventEmitter {
8891
return setImmediate(() => callback())
8992
}
9093

94+
// If already dialing this peer, ignore
95+
if (this._dials.has(idB58Str)) {
96+
this.log('already dialing %s, ignoring dial attempt', idB58Str)
97+
return setImmediate(() => callback())
98+
}
99+
this._dials.add(idB58Str)
100+
91101
this.log('dialing %s', idB58Str)
92102
this.libp2p.dialProtocol(peerInfo, this.multicodec, (err, conn) => {
103+
this.log('dial to %s complete', idB58Str)
93104
if (err) {
94105
this.log.err(err)
95106
return callback()
96107
}
97108

109+
// If the dial is not in the set, it means that floodsub has been
110+
// stopped, so we should just bail out
111+
if (!this._dials.has(idB58Str)) {
112+
this.log('floodsub was stopped, not processing dial to %s', idB58Str)
113+
return callback()
114+
}
115+
this._dials.delete(idB58Str)
116+
98117
this._onDial(peerInfo, conn, callback)
99118
})
100119
}
@@ -149,6 +168,7 @@ class BaseProtocol extends EventEmitter {
149168
if (this.started) {
150169
return setImmediate(() => callback(new Error('already started')))
151170
}
171+
this.log('starting')
152172

153173
this.libp2p.handle(this.multicodec, this._onConnection)
154174

@@ -160,6 +180,7 @@ class BaseProtocol extends EventEmitter {
160180

161181
asyncEach(peerInfos, (peer, cb) => this._dialPeer(peer, cb), (err) => {
162182
setImmediate(() => {
183+
this.log('started')
163184
this.started = true
164185
callback(err)
165186
})
@@ -181,6 +202,9 @@ class BaseProtocol extends EventEmitter {
181202
this.libp2p.unhandle(this.multicodec)
182203
this.libp2p.removeListener('peer:connect', this._dialPeer)
183204

205+
// Prevent any dials that are in flight from being processed
206+
this._dials = new Set()
207+
184208
this.log('stopping')
185209
asyncEach(this.peers.values(), (peer, cb) => peer.close(cb), (err) => {
186210
if (err) {

test/2-nodes.js

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
const chai = require('chai')
66
chai.use(require('dirty-chai'))
7+
chai.use(require('chai-spies'))
78
const expect = chai.expect
89
const parallel = require('async/parallel')
910
const series = require('async/series')
@@ -390,6 +391,125 @@ describe('basics between 2 nodes', () => {
390391
], done)
391392
})
392393
})
394+
395+
describe('prevent concurrent dials', () => {
396+
let sandbox
397+
let nodeA
398+
let nodeB
399+
let fsA
400+
let fsB
401+
402+
before((done) => {
403+
sandbox = chai.spy.sandbox()
404+
405+
series([
406+
(cb) => createNode('/ip4/127.0.0.1/tcp/0', cb),
407+
(cb) => createNode('/ip4/127.0.0.1/tcp/0', cb)
408+
], (err, nodes) => {
409+
if (err) return done(err)
410+
411+
nodeA = nodes[0]
412+
nodeB = nodes[1]
413+
414+
// Put node B in node A's peer book
415+
nodeA.peerBook.put(nodeB.peerInfo)
416+
417+
fsA = new FloodSub(nodeA)
418+
fsB = new FloodSub(nodeB)
419+
420+
fsB.start(done)
421+
})
422+
})
423+
424+
after((done) => {
425+
sandbox.restore()
426+
427+
parallel([
428+
(cb) => nodeA.stop(cb),
429+
(cb) => nodeB.stop(cb)
430+
], (ignoreErr) => {
431+
done()
432+
})
433+
})
434+
435+
it('does not dial twice to same peer', (done) => {
436+
sandbox.on(fsA, ['_onDial'])
437+
438+
// When node A starts, it will dial all peers in its peer book, which
439+
// is just peer B
440+
fsA.start(startComplete)
441+
442+
// Simulate a connection coming in from peer B at the same time. This
443+
// causes floodsub to dial peer B
444+
nodeA.emit('peer:connect', nodeB.peerInfo)
445+
446+
function startComplete () {
447+
// Check that only one dial was made
448+
setTimeout(() => {
449+
expect(fsA._onDial).to.have.been.called.once()
450+
done()
451+
}, 1000)
452+
}
453+
})
454+
})
455+
456+
describe('prevent processing dial after stop', () => {
457+
let sandbox
458+
let nodeA
459+
let nodeB
460+
let fsA
461+
let fsB
462+
463+
before((done) => {
464+
sandbox = chai.spy.sandbox()
465+
466+
series([
467+
(cb) => createNode('/ip4/127.0.0.1/tcp/0', cb),
468+
(cb) => createNode('/ip4/127.0.0.1/tcp/0', cb)
469+
], (err, nodes) => {
470+
if (err) return done(err)
471+
472+
nodeA = nodes[0]
473+
nodeB = nodes[1]
474+
475+
fsA = new FloodSub(nodeA)
476+
fsB = new FloodSub(nodeB)
477+
478+
parallel([
479+
(cb) => fsA.start(cb),
480+
(cb) => fsB.start(cb)
481+
], done)
482+
})
483+
})
484+
485+
after((done) => {
486+
sandbox.restore()
487+
488+
parallel([
489+
(cb) => nodeA.stop(cb),
490+
(cb) => nodeB.stop(cb)
491+
], (ignoreErr) => {
492+
done()
493+
})
494+
})
495+
496+
it('does not process dial after stop', (done) => {
497+
sandbox.on(fsA, ['_onDial'])
498+
499+
// Simulate a connection coming in from peer B at the same time. This
500+
// causes floodsub to dial peer B
501+
nodeA.emit('peer:connect', nodeB.peerInfo)
502+
503+
// Stop floodsub before the dial can complete
504+
fsA.stop(() => {
505+
// Check that the dial was not processed
506+
setTimeout(() => {
507+
expect(fsA._onDial).to.not.have.been.called()
508+
done()
509+
}, 1000)
510+
})
511+
})
512+
})
393513
})
394514

395515
function shouldNotHappen (msg) {

0 commit comments

Comments
 (0)