Skip to content

Commit 0357bf2

Browse files
authored
feat: support a priority queue for dials (libp2p#325)
1 parent 4862c48 commit 0357bf2

File tree

9 files changed

+112
-29
lines changed

9 files changed

+112
-29
lines changed

README.md

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,16 @@ works like dial, but calls back with a [Connection State Machine](#connection-st
153153
Connection state machines emit a number of events that can be used to determine the current state of the connection
154154
and to received the underlying connection that can be used to transfer data.
155155
156+
### `switch.dialer.connect(peer, options, callback)`
157+
158+
a low priority dial to the provided peer. Calls to `dial` and `dialFSM` will take priority. This should be used when an application only wishes to establish connections to new peers, such as during peer discovery when there is a low peer count. Currently, anything greater than the HIGH_PRIORITY (10) will be placed into the cold call queue, and anything less than or equal to the HIGH_PRIORITY will be added to the normal queue.
159+
160+
- `peer`: can be an instance of [PeerInfo][], [PeerId][] or [multiaddr][]
161+
- `options`: Optional
162+
- `options.priority`: Number of the priority of the dial, defaults to 20.
163+
- `options.useFSM`: Boolean of whether or not to callback with a [Connection State Machine](#connection-state-machine)
164+
- `callback`: Function with signature `function (err, connFSM) {}` where `connFSM` is a [Connection State Machine](#connection-state-machine)
165+
156166
##### Events
157167
- `error`: emitted whenever a fatal error occurs with the connection; the error will be emitted.
158168
- `error:upgrade_failed`: emitted whenever the connection fails to upgrade with a muxer, this is not fatal.
@@ -187,7 +197,17 @@ Emitted when the switch encounters an error.
187197

188198
### `switch.on('peer-mux-closed', (peer) => {})`
189199

190-
- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a muxed connection.
200+
- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a muxed connection with.
201+
202+
### `switch.on('connection:start', (peer) => {})`
203+
This will be triggered anytime a new connection is created.
204+
205+
- `peer`: is instance of [PeerInfo][] that has info of the peer we have just started a connection with.
206+
207+
### `switch.on('connection:end', (peer) => {})`
208+
This will be triggered anytime an existing connection, regardless of state, is removed from the switch's internal connection tracking.
209+
210+
- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a connection with.
191211

192212
### `switch.on('start', () => {})`
193213

src/connection/base.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ class BaseConnection extends EventEmitter {
8080
* @returns {void}
8181
*/
8282
_onDisconnected () {
83+
this.switch.connection.remove(this)
8384
this.log('disconnected from %s', this.theirB58Id)
8485
this.emit('close')
8586
this.removeAllListeners()

src/connection/index.js

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,8 +269,6 @@ class ConnectionFSM extends BaseConnection {
269269
_onDisconnecting () {
270270
this.log('disconnecting from %s', this.theirB58Id, Boolean(this.muxer))
271271

272-
this.switch.connection.remove(this)
273-
274272
delete this.switch.conns[this.theirB58Id]
275273

276274
let tasks = []

src/connection/manager.js

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,12 @@ class ConnectionManager {
3333
// Only add it if it's not there
3434
if (!this.get(connection)) {
3535
this.connections[connection.theirB58Id].push(connection)
36-
this.switch.emit('peer-mux-established', connection.theirPeerInfo)
36+
this.switch.emit('connection:start', connection.theirPeerInfo)
37+
if (connection.getState() === 'MUXED') {
38+
this.switch.emit('peer-mux-established', connection.theirPeerInfo)
39+
} else {
40+
connection.once('muxed', () => this.switch.emit('peer-mux-established', connection.theirPeerInfo))
41+
}
3742
}
3843
}
3944

@@ -81,8 +86,10 @@ class ConnectionManager {
8186
remove (connection) {
8287
// No record of the peer, disconnect it
8388
if (!this.connections[connection.theirB58Id]) {
84-
connection.theirPeerInfo.disconnect()
85-
this.switch.emit('peer-mux-closed', connection.theirPeerInfo)
89+
if (connection.theirPeerInfo) {
90+
connection.theirPeerInfo.disconnect()
91+
this.switch.emit('peer-mux-closed', connection.theirPeerInfo)
92+
}
8693
return
8794
}
8895

@@ -99,6 +106,9 @@ class ConnectionManager {
99106
connection.theirPeerInfo.disconnect()
100107
this.switch.emit('peer-mux-closed', connection.theirPeerInfo)
101108
}
109+
110+
// A tracked connection was closed, let the world know
111+
this.switch.emit('connection:end', connection.theirPeerInfo)
102112
}
103113

104114
/**

src/constants.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,7 @@ module.exports = {
66
DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take
77
MAX_COLD_CALLS: 50, // How many dials w/o protocols that can be queued
88
MAX_PARALLEL_DIALS: 100, // Maximum allowed concurrent dials
9-
QUARTER_HOUR: 15 * 60e3
9+
QUARTER_HOUR: 15 * 60e3,
10+
PRIORITY_HIGH: 10,
11+
PRIORITY_LOW: 20
1012
}

src/dialer/index.js

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ const {
66
BLACK_LIST_ATTEMPTS,
77
BLACK_LIST_TTL,
88
MAX_COLD_CALLS,
9-
MAX_PARALLEL_DIALS
9+
MAX_PARALLEL_DIALS,
10+
PRIORITY_HIGH,
11+
PRIORITY_LOW
1012
} = require('../constants')
1113

1214
module.exports = function (_switch) {
@@ -19,7 +21,7 @@ module.exports = function (_switch) {
1921
* @param {DialRequest} dialRequest
2022
* @returns {void}
2123
*/
22-
function _dial ({ peerInfo, protocol, useFSM, callback }) {
24+
function _dial ({ peerInfo, protocol, options, callback }) {
2325
if (typeof protocol === 'function') {
2426
callback = protocol
2527
protocol = null
@@ -32,7 +34,7 @@ module.exports = function (_switch) {
3234
}
3335

3436
// Add it to the queue, it will automatically get executed
35-
dialQueueManager.add({ peerInfo, protocol, useFSM, callback })
37+
dialQueueManager.add({ peerInfo, protocol, options, callback })
3638
}
3739

3840
/**
@@ -64,14 +66,33 @@ module.exports = function (_switch) {
6466
dialQueueManager.clearBlacklist(peerInfo)
6567
}
6668

69+
/**
70+
* Attempts to establish a connection to the given `peerInfo` at
71+
* a lower priority than a standard dial.
72+
* @param {PeerInfo} peerInfo
73+
* @param {object} options
74+
* @param {boolean} options.useFSM Whether or not to return a `ConnectionFSM`. Defaults to false.
75+
* @param {number} options.priority Lowest priority goes first. Defaults to 20.
76+
* @param {function(Error, Connection)} callback
77+
*/
78+
function connect (peerInfo, options, callback) {
79+
if (typeof options === 'function') {
80+
callback = options
81+
options = null
82+
}
83+
options = { useFSM: false, priority: PRIORITY_LOW, ...options }
84+
_dial({ peerInfo, protocol: null, options, callback })
85+
}
86+
6787
/**
6888
* Adds the dial request to the queue for the given `peerInfo`
89+
* The request will be added with a high priority (10).
6990
* @param {PeerInfo} peerInfo
7091
* @param {string} protocol
7192
* @param {function(Error, Connection)} callback
7293
*/
7394
function dial (peerInfo, protocol, callback) {
74-
_dial({ peerInfo, protocol, useFSM: false, callback })
95+
_dial({ peerInfo, protocol, options: { useFSM: false, priority: PRIORITY_HIGH }, callback })
7596
}
7697

7798
/**
@@ -82,10 +103,11 @@ module.exports = function (_switch) {
82103
* @param {function(Error, ConnectionFSM)} callback
83104
*/
84105
function dialFSM (peerInfo, protocol, callback) {
85-
_dial({ peerInfo, protocol, useFSM: true, callback })
106+
_dial({ peerInfo, protocol, options: { useFSM: true, priority: PRIORITY_HIGH }, callback })
86107
}
87108

88109
return {
110+
connect,
89111
dial,
90112
dialFSM,
91113
clearBlacklist,

src/dialer/queue.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ log.error = debug('libp2p:switch:dial:error')
1313
* @typedef {Object} DialRequest
1414
* @property {PeerInfo} peerInfo - The peer to dial to
1515
* @property {string} [protocol] - The protocol to create a stream for
16-
* @property {boolean} useFSM - If `callback` should return a ConnectionFSM
16+
* @property {object} options
17+
* @property {boolean} options.useFSM - If `callback` should return a ConnectionFSM
18+
* @property {number} options.priority - The priority of the dial
1719
* @property {function(Error, Connection|ConnectionFSM)} callback
1820
*/
1921

src/dialer/queueManager.js

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ const Queue = require('./queue')
55
const { DIAL_ABORTED } = require('../errors')
66
const nextTick = require('async/nextTick')
77
const retimer = require('retimer')
8-
const { QUARTER_HOUR } = require('../constants')
8+
const { QUARTER_HOUR, PRIORITY_HIGH } = require('../constants')
99
const debug = require('debug')
1010
const log = debug('libp2p:switch:dial:manager')
1111
const noop = () => {}
@@ -103,17 +103,25 @@ class DialQueueManager {
103103
* @param {DialRequest} dialRequest
104104
* @returns {void}
105105
*/
106-
add ({ peerInfo, protocol, useFSM, callback }) {
106+
add ({ peerInfo, protocol, options, callback }) {
107107
callback = callback ? once(callback) : noop
108108

109109
// Add the dial to its respective queue
110110
const targetQueue = this.getQueue(peerInfo)
111-
// If we have too many cold calls, abort the dial immediately
112-
if (this._coldCallQueue.size >= this.switch.dialer.MAX_COLD_CALLS && !protocol) {
113-
return nextTick(callback, DIAL_ABORTED())
111+
112+
// Cold Call
113+
if (options.priority > PRIORITY_HIGH) {
114+
// If we have too many cold calls, abort the dial immediately
115+
if (this._coldCallQueue.size >= this.switch.dialer.MAX_COLD_CALLS) {
116+
return nextTick(callback, DIAL_ABORTED())
117+
}
118+
119+
if (this._queue.has(targetQueue.id)) {
120+
return nextTick(callback, DIAL_ABORTED())
121+
}
114122
}
115123

116-
targetQueue.add(protocol, useFSM, callback)
124+
targetQueue.add(protocol, options.useFSM, callback)
117125

118126
// If we're already connected to the peer, start the queue now
119127
// While it might cause queues to go over the max parallel amount,
@@ -130,15 +138,12 @@ class DialQueueManager {
130138

131139
// Add the id to its respective queue set if the queue isn't running
132140
if (!targetQueue.isRunning) {
133-
if (protocol) {
141+
if (options.priority <= PRIORITY_HIGH) {
134142
this._queue.add(targetQueue.id)
135143
this._coldCallQueue.delete(targetQueue.id)
136144
// Only add it to the cold queue if it's not in the normal queue
137-
} else if (!this._queue.has(targetQueue.id)) {
138-
this._coldCallQueue.add(targetQueue.id)
139-
// The peer is already in the normal queue, abort the cold call
140145
} else {
141-
return nextTick(callback, DIAL_ABORTED())
146+
this._coldCallQueue.add(targetQueue.id)
142147
}
143148
}
144149

test/dialer.spec.js

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,20 @@ const PeerBook = require('peer-book')
1212
const Queue = require('../src/dialer/queue')
1313
const QueueManager = require('../src/dialer/queueManager')
1414
const Switch = require('../src')
15+
const { PRIORITY_HIGH, PRIORITY_LOW } = require('../src/constants')
1516

1617
const utils = require('./utils')
1718
const createInfos = utils.createInfos
1819

1920
describe('dialer', () => {
2021
let switchA
22+
let switchB
2123

22-
before((done) => createInfos(1, (err, infos) => {
24+
before((done) => createInfos(2, (err, infos) => {
2325
expect(err).to.not.exist()
2426

2527
switchA = new Switch(infos[0], new PeerBook())
28+
switchB = new Switch(infos[1], new PeerBook())
2629

2730
done()
2831
}))
@@ -31,6 +34,26 @@ describe('dialer', () => {
3134
sinon.restore()
3235
})
3336

37+
describe('connect', () => {
38+
afterEach(() => {
39+
switchA.dialer.clearBlacklist(switchB._peerInfo)
40+
})
41+
42+
it('should use default options', (done) => {
43+
switchA.dialer.connect(switchB._peerInfo, (err) => {
44+
expect(err).to.exist()
45+
done()
46+
})
47+
})
48+
49+
it('should be able to use custom options', (done) => {
50+
switchA.dialer.connect(switchB._peerInfo, { useFSM: true, priority: PRIORITY_HIGH }, (err) => {
51+
expect(err).to.exist()
52+
done()
53+
})
54+
})
55+
})
56+
3457
describe('queue', () => {
3558
it('should blacklist forever after 5 blacklists', () => {
3659
const queue = new Queue('QM', switchA)
@@ -58,7 +81,7 @@ describe('dialer', () => {
5881
id: { toB58String: () => 'QmA' }
5982
},
6083
protocol: null,
61-
useFSM: true,
84+
options: { useFSM: true, priority: PRIORITY_LOW },
6285
callback: (err) => {
6386
expect(err.code).to.eql('DIAL_ABORTED')
6487
done()
@@ -75,7 +98,7 @@ describe('dialer', () => {
7598
isConnected: () => null
7699
},
77100
protocol: '/echo/1.0.0',
78-
useFSM: true,
101+
options: { useFSM: true, priority: PRIORITY_HIGH },
79102
callback: () => {}
80103
}
81104

@@ -99,7 +122,7 @@ describe('dialer', () => {
99122
isConnected: () => null
100123
},
101124
protocol: null,
102-
useFSM: true,
125+
options: { useFSM: true, priority: PRIORITY_LOW },
103126
callback: () => {}
104127
}
105128

@@ -120,7 +143,7 @@ describe('dialer', () => {
120143
isConnected: () => null
121144
},
122145
protocol: null,
123-
useFSM: true,
146+
options: { useFSM: true, priority: PRIORITY_LOW },
124147
callback: (err) => {
125148
expect(runSpy.called).to.eql(false)
126149
expect(hasSpy.called).to.eql(true)

0 commit comments

Comments
 (0)