Skip to content

Commit 20175dd

Browse files
authored
feat: global dial queue (libp2p#314)
* feat: add a general queue to limit all dials * fix: improve queue count logic and add better abort * feat: add a basic blacklist * fix: abort dial queue on error instead of stop * feat: add a crude priority lane * test: add test for blacklist error * fix: make blacklist and max dials configurable * refactor: blacklist after callback * test: improve testings around blacklisting
1 parent 1e23eb2 commit 20175dd

File tree

9 files changed

+158
-22
lines changed

9 files changed

+158
-22
lines changed

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ const sw = new switch(peerInfo , peerBook [, options])
6161

6262
If defined, `options` should be an object with the following keys and respective values:
6363

64+
- `blacklistTTL`: - number of ms a peer should not be dialable to after it errors. Defaults to `120000`(120 seconds)
65+
- `maxParallelDials` - number of concurrent dials the switch should allow. Defaults to `50`
6466
- `stats`: an object with the following keys and respective values:
6567
- `maxOldPeersRetention`: maximum old peers retention. For when peers disconnect and keeping the stats around in case they reconnect. Defaults to `100`.
6668
- `computeThrottleMaxQueueSize`: maximum queue size to perform stats computation throttling. Defaults to `1000`.

src/constants.js

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
'use strict'
2+
3+
module.exports = {
4+
BLACK_LIST_TTL: 120e3, // How long before an errored peer can be dialed again
5+
MAX_PARALLEL_DIALS: 50 // Maximum allowed concurrent dials
6+
}

src/dialer/index.js

+13-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
const DialQueueManager = require('./queueManager')
44
const getPeerInfo = require('../get-peer-info')
5+
const { MAX_PARALLEL_DIALS, BLACK_LIST_TTL } = require('../constants')
56

67
module.exports = function (_switch) {
78
const dialQueueManager = new DialQueueManager(_switch)
@@ -39,6 +40,14 @@ module.exports = function (_switch) {
3940
callback()
4041
}
4142

43+
/**
44+
* Clears the blacklist for a given peer
45+
* @param {PeerInfo} peerInfo
46+
*/
47+
function clearBlacklist (peerInfo) {
48+
dialQueueManager.clearBlacklist(peerInfo)
49+
}
50+
4251
/**
4352
* Adds the dial request to the queue for the given `peerInfo`
4453
* @param {PeerInfo} peerInfo
@@ -63,6 +72,9 @@ module.exports = function (_switch) {
6372
return {
6473
dial,
6574
dialFSM,
66-
abort
75+
abort,
76+
clearBlacklist,
77+
BLACK_LIST_TTL: isNaN(_switch._options.blacklistTTL) ? BLACK_LIST_TTL : _switch._options.blacklistTTL,
78+
MAX_PARALLEL_DIALS: isNaN(_switch._options.maxParallelDials) ? MAX_PARALLEL_DIALS : _switch._options.maxParallelDials
6779
}
6880
}

src/dialer/queue.js

+53-10
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict'
22

33
const ConnectionFSM = require('../connection')
4-
const { DIAL_ABORTED } = require('../errors')
4+
const { DIAL_ABORTED, ERR_BLACKLISTED } = require('../errors')
55
const Connection = require('interface-connection').Connection
66
const nextTick = require('async/nextTick')
77
const once = require('once')
@@ -63,12 +63,15 @@ class Queue {
6363
* @constructor
6464
* @param {string} peerId
6565
* @param {Switch} _switch
66+
* @param {function} onStopped Called when the queue stops
6667
*/
67-
constructor (peerId, _switch) {
68+
constructor (peerId, _switch, onStopped) {
6869
this.id = peerId
6970
this.switch = _switch
7071
this._queue = []
72+
this.blackListed = null
7173
this.isRunning = false
74+
this.onStopped = onStopped
7275
}
7376
get length () {
7477
return this._queue.length
@@ -80,39 +83,78 @@ class Queue {
8083
* @param {string} protocol
8184
* @param {boolean} useFSM If callback should use a ConnectionFSM instead
8285
* @param {function(Error, Connection)} callback
86+
* @returns {boolean} whether or not the queue has been started
8387
*/
8488
add (protocol, useFSM, callback) {
89+
if (!this.isDialAllowed()) {
90+
nextTick(callback, ERR_BLACKLISTED())
91+
return false
92+
}
8593
this._queue.push({ protocol, useFSM, callback })
86-
if (!this.isRunning) {
87-
log('starting dial queue to %s', this.id)
88-
this.start()
94+
return this.start()
95+
}
96+
97+
/**
98+
* Determines whether or not dialing is currently allowed
99+
* @returns {boolean}
100+
*/
101+
isDialAllowed () {
102+
if (this.blackListed) {
103+
// If the blacklist ttl has passed, reset it
104+
if (Date.now() - this.blackListed > this.switch.dialer.BLACK_LIST_TTL) {
105+
this.blackListed = null
106+
return true
107+
}
108+
// Dial is not allowed
109+
return false
89110
}
111+
return true
90112
}
91113

92114
/**
93-
* Starts the queue
115+
* Starts the queue. If the queue was started `true` will be returned.
116+
* If the queue was already running `false` is returned.
117+
* @returns {boolean}
94118
*/
95119
start () {
96-
this.isRunning = true
97-
this._run()
120+
if (!this.isRunning) {
121+
log('starting dial queue to %s', this.id)
122+
this.isRunning = true
123+
this._run()
124+
return true
125+
}
126+
return false
98127
}
99128

100129
/**
101130
* Stops the queue
102131
*/
103132
stop () {
104-
this.isRunning = false
133+
if (this.isRunning) {
134+
log('stopping dial queue to %s', this.id)
135+
this.isRunning = false
136+
this.onStopped()
137+
}
105138
}
106139

107140
/**
108141
* Stops the queue and errors the callback for each dial request
109142
*/
110143
abort () {
111-
this.stop()
112144
while (this.length > 0) {
113145
let dial = this._queue.shift()
114146
dial.callback(DIAL_ABORTED())
115147
}
148+
this.stop()
149+
}
150+
151+
/**
152+
* Marks the queue as blacklisted. The queue will be immediately aborted.
153+
*/
154+
blacklist () {
155+
log('blacklisting queue for %s', this.id)
156+
this.blackListed = Date.now()
157+
this.abort()
116158
}
117159

118160
/**
@@ -189,6 +231,7 @@ class Queue {
189231
// depending on the error.
190232
connectionFSM.once('error', (err) => {
191233
queuedDial.callback(err)
234+
this.blacklist()
192235
})
193236

194237
connectionFSM.once('close', () => {

src/dialer/queueManager.js

+57-7
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
const once = require('once')
44
const Queue = require('./queue')
5-
5+
const { DIAL_ABORTED } = require('../errors')
66
const noop = () => {}
77

88
class DialQueueManager {
@@ -11,8 +11,10 @@ class DialQueueManager {
1111
* @param {Switch} _switch
1212
*/
1313
constructor (_switch) {
14-
this._queue = {}
14+
this._queue = []
15+
this._queues = {}
1516
this.switch = _switch
17+
this.dials = 0
1618
}
1719

1820
/**
@@ -22,7 +24,14 @@ class DialQueueManager {
2224
* This causes the entire DialerQueue to be drained
2325
*/
2426
abort () {
25-
const queues = Object.values(this._queue)
27+
// Abort items in the general queue
28+
while (this._queue.length > 0) {
29+
let dial = this._queue.shift()
30+
dial.callback(DIAL_ABORTED())
31+
}
32+
33+
// Abort the individual peer queues
34+
const queues = Object.values(this._queues)
2635
queues.forEach(dialQueue => {
2736
dialQueue.abort()
2837
})
@@ -32,12 +41,53 @@ class DialQueueManager {
3241
* Adds the `dialRequest` to the queue and ensures the queue is running
3342
*
3443
* @param {DialRequest} dialRequest
44+
* @returns {void}
3545
*/
3646
add ({ peerInfo, protocol, useFSM, callback }) {
3747
callback = callback ? once(callback) : noop
3848

39-
let dialQueue = this.getQueue(peerInfo)
40-
dialQueue.add(protocol, useFSM, callback)
49+
// If the target queue is currently running, just add the dial
50+
// directly to it. This acts as a crude priority lane for multiple
51+
// calls to a peer.
52+
const targetQueue = this.getQueue(peerInfo)
53+
if (targetQueue.isRunning) {
54+
targetQueue.add(protocol, useFSM, callback)
55+
return
56+
}
57+
58+
this._queue.push({ peerInfo, protocol, useFSM, callback })
59+
this.run()
60+
}
61+
62+
/**
63+
* Will execute up to `MAX_PARALLEL_DIALS` dials
64+
*/
65+
run () {
66+
if (this.dials < this.switch.dialer.MAX_PARALLEL_DIALS && this._queue.length > 0) {
67+
let { peerInfo, protocol, useFSM, callback } = this._queue.shift()
68+
let dialQueue = this.getQueue(peerInfo)
69+
if (dialQueue.add(protocol, useFSM, callback)) {
70+
this.dials++
71+
}
72+
}
73+
}
74+
75+
/**
76+
* Will remove the `peerInfo` from the dial blacklist
77+
* @param {PeerInfo} peerInfo
78+
*/
79+
clearBlacklist (peerInfo) {
80+
this.getQueue(peerInfo).blackListed = null
81+
}
82+
83+
/**
84+
* A handler for when dialing queues stop. This will trigger
85+
* `run()` in order to keep the queue processing.
86+
* @private
87+
*/
88+
_onQueueStopped () {
89+
this.dials--
90+
this.run()
4191
}
4292

4393
/**
@@ -48,8 +98,8 @@ class DialQueueManager {
4898
getQueue (peerInfo) {
4999
const id = peerInfo.id.toB58String()
50100

51-
this._queue[id] = this._queue[id] || new Queue(id, this.switch)
52-
return this._queue[id]
101+
this._queues[id] = this._queues[id] || new Queue(id, this.switch, this._onQueueStopped.bind(this))
102+
return this._queues[id]
53103
}
54104
}
55105

src/errors.js

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const errCode = require('err-code')
55
module.exports = {
66
CONNECTION_FAILED: (err) => errCode(err, 'CONNECTION_FAILED'),
77
DIAL_ABORTED: () => errCode('Dial was aborted', 'DIAL_ABORTED'),
8+
ERR_BLACKLISTED: () => errCode('Dial is currently blacklisted for this peer', 'ERR_BLACKLISTED'),
89
DIAL_SELF: () => errCode('A node cannot dial itself', 'DIAL_SELF'),
910
INVALID_STATE_TRANSITION: (err) => errCode(err, 'INVALID_STATE_TRANSITION'),
1011
NO_TRANSPORTS_REGISTERED: () => errCode('No transports registered, dial not possible', 'NO_TRANSPORTS_REGISTERED'),

src/index.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,9 @@ class Switch extends EventEmitter {
110110
})
111111

112112
// higher level (public) API
113-
const dialer = getDialer(this)
114-
this.dial = dialer.dial
115-
this.dialFSM = dialer.dialFSM
113+
this.dialer = getDialer(this)
114+
this.dial = this.dialer.dial
115+
this.dialFSM = this.dialer.dialFSM
116116
}
117117

118118
/**

test/circuit-relay.node.js

+4-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ const getPorts = require('portfinder').getPorts
1919
const utils = require('./utils')
2020
const createInfos = utils.createInfos
2121
const Swarm = require('../src')
22+
const switchOptions = {
23+
blacklistTTL: 0 // nullifies blacklisting
24+
}
2225

2326
describe(`circuit`, function () {
2427
let swarmA // TCP and WS
@@ -36,7 +39,7 @@ describe(`circuit`, function () {
3639
peerA.multiaddrs.add('/ip4/0.0.0.0/tcp/9001')
3740
peerB.multiaddrs.add('/ip4/127.0.0.1/tcp/9002/ws')
3841

39-
swarmA = new Swarm(peerA, new PeerBook())
42+
swarmA = new Swarm(peerA, new PeerBook(), switchOptions)
4043
swarmB = new Swarm(peerB, new PeerBook())
4144
swarmC = new Swarm(peerC, new PeerBook())
4245

test/dial-fsm.node.js

+19
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ describe('dialFSM', () => {
103103
protocol = '/error/1.0.0'
104104
switchC.handle(protocol, () => { })
105105

106+
switchA.dialer.clearBlacklist(switchC._peerInfo)
106107
switchA.dialFSM(switchC._peerInfo, protocol, (err, connFSM) => {
107108
expect(err).to.not.exist()
108109
connFSM.once('error', (err) => {
@@ -113,6 +114,24 @@ describe('dialFSM', () => {
113114
})
114115
})
115116

117+
it('should error when the peer is blacklisted', (done) => {
118+
protocol = '/error/1.0.0'
119+
switchC.handle(protocol, () => { })
120+
121+
switchA.dialer.clearBlacklist(switchC._peerInfo)
122+
switchA.dialFSM(switchC._peerInfo, protocol, (err, connFSM) => {
123+
expect(err).to.not.exist()
124+
connFSM.once('error', () => {
125+
// dial with the blacklist
126+
switchA.dialFSM(switchC._peerInfo, protocol, (err) => {
127+
expect(err).to.exist()
128+
expect(err.code).to.eql('ERR_BLACKLISTED')
129+
done()
130+
})
131+
})
132+
})
133+
})
134+
116135
it('should emit a `closed` event when closed', (done) => {
117136
protocol = '/closed/1.0.0'
118137
switchB.handle(protocol, () => { })

0 commit comments

Comments
 (0)