Skip to content

Commit 6a94d9a

Browse files
authored
feat: add basic dial queue to avoid many connections to peer (libp2p#310)
BREAKING CHANGE: This adds a very basic dial queue peer peer. This will prevent multiple, simultaneous dial requests to the same peer from creating multiple connections. The requests will be queued per peer, and will leverage the same connection when possible. The breaking change here is that `.dial`, will no longer return a connection. js-libp2p, circuit relay, and kad-dht, which use `.dial` were not using the returned connection. So while this is a breaking change it should not break the existing libp2p stack. If custom applications are leveraging the returned connection, they will need to convert to only using the connection returned via the callback. * chore: dont log priviatized unless it actually happened * refactor: only get our addresses for filtering once
1 parent 4216144 commit 6a94d9a

18 files changed

+537
-236
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
"peer-book": "~0.9.1",
5555
"portfinder": "^1.0.20",
5656
"pull-length-prefixed": "^1.3.1",
57-
"pull-mplex": "~0.1.0",
57+
"pull-mplex": "~0.1.2",
5858
"pull-pair": "^1.1.0",
5959
"sinon": "^7.2.3",
6060
"webrtcsupport": "^2.2.0"

src/connection/base.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ class BaseConnection extends EventEmitter {
9292
* @returns {void}
9393
*/
9494
_onPrivatized () {
95-
this.log('successfully privatized incoming connection')
9695
this.emit('private', this.conn)
9796
}
9897

src/connection/handler.js

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,31 +14,32 @@ function listener (_switch) {
1414
* @param {function} handler A custom handler to use
1515
* @returns {function(Connection)} A connection handler function
1616
*/
17-
return (transportKey, handler) => {
17+
return function (transportKey, handler) {
1818
/**
1919
* Takes a base connection and manages listening behavior
2020
*
2121
* @param {Connection} conn The connection to manage
2222
* @returns {void}
2323
*/
24-
return (conn) => {
25-
// Add a transport level observer, if needed
26-
const connection = transportKey ? observeConn(transportKey, null, conn, _switch.observer) : conn
24+
return function (conn) {
25+
log('received incoming connection for transport %s', transportKey)
26+
conn.getPeerInfo((_, peerInfo) => {
27+
// Add a transport level observer, if needed
28+
const connection = transportKey ? observeConn(transportKey, null, conn, _switch.observer) : conn
29+
const connFSM = new IncomingConnection({ connection, _switch, transportKey, peerInfo })
2730

28-
log('received incoming connection')
29-
const connFSM = new IncomingConnection({ connection, _switch, transportKey })
31+
connFSM.once('error', (err) => log(err))
32+
connFSM.once('private', (_conn) => {
33+
// Use the custom handler, if it was provided
34+
if (handler) {
35+
return handler(_conn)
36+
}
37+
connFSM.encrypt()
38+
})
39+
connFSM.once('encrypted', () => connFSM.upgrade())
3040

31-
connFSM.once('error', (err) => log(err))
32-
connFSM.once('private', (_conn) => {
33-
// Use the custom handler, if it was provided
34-
if (handler) {
35-
return handler(_conn)
36-
}
37-
connFSM.encrypt()
41+
connFSM.protect()
3842
})
39-
connFSM.once('encrypted', () => connFSM.upgrade())
40-
41-
connFSM.protect()
4243
}
4344
}
4445
}

src/connection/incoming.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@ const withIs = require('class-is')
77
const BaseConnection = require('./base')
88

99
class IncomingConnectionFSM extends BaseConnection {
10-
constructor ({ connection, _switch, transportKey }) {
10+
constructor ({ connection, _switch, transportKey, peerInfo }) {
1111
super({
1212
_switch,
1313
name: `inc:${_switch._peerInfo.id.toB58String().slice(0, 8)}`
1414
})
1515
this.conn = connection
16-
this.theirPeerInfo = null
16+
this.theirPeerInfo = peerInfo || null
17+
this.theirB58Id = this.theirPeerInfo ? this.theirPeerInfo.id.toB58String() : null
1718
this.ourPeerInfo = this.switch._peerInfo
1819
this.transportKey = transportKey
1920
this.protocolMuxer = this.switch.protocolMuxer(this.transportKey)

src/connection/index.js

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const Circuit = require('libp2p-circuit')
55
const multistream = require('multistream-select')
66
const withIs = require('class-is')
77
const BaseConnection = require('./base')
8+
const parallel = require('async/parallel')
89

910
const observeConnection = require('../observe-connection')
1011
const {
@@ -33,7 +34,7 @@ const {
3334
*/
3435
class ConnectionFSM extends BaseConnection {
3536
/**
36-
* @param {ConnectionOptions} param0
37+
* @param {ConnectionOptions} connectionOptions
3738
* @constructor
3839
*/
3940
constructor ({ _switch, peerInfo, muxer, conn, type = 'out' }) {
@@ -261,7 +262,7 @@ class ConnectionFSM extends BaseConnection {
261262
* @returns {void}
262263
*/
263264
_onDisconnecting () {
264-
this.log('disconnecting from %s', this.theirB58Id)
265+
this.log('disconnecting from %s', this.theirB58Id, Boolean(this.muxer))
265266

266267
// Issue disconnects on both Peers
267268
if (this.theirPeerInfo) {
@@ -272,22 +273,31 @@ class ConnectionFSM extends BaseConnection {
272273

273274
delete this.switch.conns[this.theirB58Id]
274275

276+
let tasks = []
277+
275278
// Clean up stored connections
276279
if (this.muxer) {
277-
this.muxer.end()
278-
delete this.muxer
279-
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
280+
tasks.push((cb) => {
281+
this.muxer.end(() => {
282+
delete this.muxer
283+
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
284+
cb()
285+
})
286+
})
280287
}
281288

282289
// If we have the base connection, abort it
290+
// Ignore abort errors, since we're closing
283291
if (this.conn) {
284-
this.conn.source(true, () => {
285-
this._state('done')
286-
delete this.conn
287-
})
288-
} else {
289-
this._state('done')
292+
try {
293+
this.conn.source.abort()
294+
} catch (_) { }
295+
delete this.conn
290296
}
297+
298+
parallel(tasks, () => {
299+
this._state('done')
300+
})
291301
}
292302

293303
/**
@@ -366,8 +376,6 @@ class ConnectionFSM extends BaseConnection {
366376
const conn = observeConnection(null, key, _conn, this.switch.observer)
367377

368378
this.muxer = this.switch.muxers[key].dialer(conn)
369-
// this.switch.muxedConns[this.theirB58Id] = this
370-
this.switch.connection.add(this)
371379

372380
this.muxer.once('close', () => {
373381
this.close()

src/connection/manager.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,12 @@ class ConnectionManager {
6161
*/
6262
getOne (peerId) {
6363
if (this.connections[peerId]) {
64-
// TODO: Maybe select the best?
65-
return this.connections[peerId][0]
64+
// Only return muxed connections
65+
for (var i = 0; i < this.connections[peerId].length; i++) {
66+
if (this.connections[peerId][i].getState() === 'MUXED') {
67+
return this.connections[peerId][i]
68+
}
69+
}
6670
}
6771
return null
6872
}

src/dialer.js

Lines changed: 0 additions & 110 deletions
This file was deleted.

src/dialer/index.js

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
'use strict'
2+
3+
const DialQueueManager = require('./queueManager')
4+
const getPeerInfo = require('../get-peer-info')
5+
6+
module.exports = function (_switch) {
7+
const dialQueueManager = new DialQueueManager(_switch)
8+
9+
_switch.state.on('STOPPING:enter', abort)
10+
11+
/**
12+
* @param {DialRequest} dialRequest
13+
* @returns {void}
14+
*/
15+
function _dial ({ peerInfo, protocol, useFSM, callback }) {
16+
if (typeof protocol === 'function') {
17+
callback = protocol
18+
protocol = null
19+
}
20+
21+
try {
22+
peerInfo = getPeerInfo(peerInfo, _switch._peerBook)
23+
} catch (err) {
24+
return callback(err)
25+
}
26+
27+
// Add it to the queue, it will automatically get executed
28+
dialQueueManager.add({ peerInfo, protocol, useFSM, callback })
29+
}
30+
31+
/**
32+
* Aborts all dials that are queued. This should
33+
* only be used when the Switch is being stopped
34+
*
35+
* @param {function} callback
36+
*/
37+
function abort (callback) {
38+
dialQueueManager.abort()
39+
callback()
40+
}
41+
42+
/**
43+
* Adds the dial request to the queue for the given `peerInfo`
44+
* @param {PeerInfo} peerInfo
45+
* @param {string} protocol
46+
* @param {function(Error, Connection)} callback
47+
*/
48+
function dial (peerInfo, protocol, callback) {
49+
_dial({ peerInfo, protocol, useFSM: false, callback })
50+
}
51+
52+
/**
53+
* Behaves like dial, except it calls back with a ConnectionFSM
54+
*
55+
* @param {PeerInfo} peerInfo
56+
* @param {string} protocol
57+
* @param {function(Error, ConnectionFSM)} callback
58+
*/
59+
function dialFSM (peerInfo, protocol, callback) {
60+
_dial({ peerInfo, protocol, useFSM: true, callback })
61+
}
62+
63+
return {
64+
dial,
65+
dialFSM,
66+
abort
67+
}
68+
}

0 commit comments

Comments
 (0)