Skip to content
This repository was archived by the owner on Aug 23, 2019. It is now read-only.

Commit b04e954

Browse files
feat(transport): use parallel limited dialer
Replaces #193 Fixes #194
1 parent 12dc042 commit b04e954

File tree

3 files changed

+8639
-21
lines changed

3 files changed

+8639
-21
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,4 @@
8686
"greenkeeper[bot] <greenkeeper[bot]@users.noreply.github.com>",
8787
"ᴠɪᴄᴛᴏʀ ʙᴊᴇʟᴋʜᴏʟᴍ <[email protected]>"
8888
]
89-
}
89+
}

src/transport.js

Lines changed: 70 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,19 @@
22

33
const Connection = require('interface-connection').Connection
44
const parallel = require('async/parallel')
5+
const queue = require('async/queue')
56
const once = require('once')
67
const debug = require('debug')
78
const log = debug('libp2p:swarm:transport')
89

910
const protocolMuxer = require('./protocol-muxer')
1011

12+
// number of concurrent outbound dials to make per peer, same as go-libp2p-swarm
13+
const defaultPerPeerRateLimit = 8
14+
1115
module.exports = function (swarm) {
16+
const queues = new Map()
17+
1218
return {
1319
add (key, transport, options, callback) {
1420
if (typeof options === 'function') {
@@ -36,32 +42,76 @@ module.exports = function (swarm) {
3642
multiaddrs = [multiaddrs]
3743
}
3844
log('dialing %s', key, multiaddrs.map((m) => m.toString()))
39-
// a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that)
45+
// filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that)
4046
multiaddrs = dialables(t, multiaddrs)
4147

42-
// b) if multiaddrs.length = 1, return the conn from the
43-
// transport, otherwise, create a passthrough
44-
if (multiaddrs.length === 1) {
45-
const conn = t.dial(multiaddrs.shift(), (err) => {
46-
if (err) return callback(err)
47-
callback(null, new Connection(conn))
48-
})
49-
return
50-
}
48+
// create dial queue if non exists
49+
let q
50+
if (queues.has(key)) {
51+
log('reusing queue')
52+
q = queues.get(key)
53+
} else {
54+
log('setting up new queue')
55+
q = queue((multiaddr, cb) => {
56+
const conn = t.dial(multiaddr, (err) => {
57+
if (err) {
58+
log('dial failed: %s', multiaddr.toString())
59+
return cb(err)
60+
}
61+
if (q.canceled) {
62+
log('dial canceled: %s', multiaddr.toString())
63+
// clean up already done dials
64+
65+
return cb()
66+
}
67+
// one is enough
68+
log('dial success: %s', multiaddr.toString())
69+
q.kill()
70+
q.canceled = true
71+
72+
q.finish(null, conn)
73+
})
74+
}, defaultPerPeerRateLimit)
75+
76+
q.errors = []
77+
q.finishCbs = []
5178

52-
// c) multiaddrs should already be a filtered list
53-
// specific for the transport we are using
54-
const proxyConn = new Connection()
79+
// handle finish
80+
q.finish = (err, conn) => {
81+
log('queue finish')
82+
queues.delete(key)
5583

56-
next(multiaddrs.shift())
84+
q.finishCbs.forEach((next) => {
85+
if (err) {
86+
next(err)
87+
} else {
88+
const proxyConn = new Connection()
89+
proxyConn.setInnerConn(conn)
90+
91+
next(null, proxyConn)
92+
}
93+
})
94+
}
5795

58-
// TODO improve in the future to make all the dials in paralell
59-
function next (multiaddr) {
60-
const conn = t.dial(multiaddr, () => {
61-
proxyConn.setInnerConn(conn)
62-
callback(null, proxyConn)
63-
})
96+
// collect errors
97+
q.error = (err) => {
98+
q.errors.push(err)
99+
}
100+
101+
// no more addresses and all failed
102+
q.drain = () => {
103+
log('queue drain')
104+
const err = new Error('Could not dial any address')
105+
err.errors = q.errors
106+
q.errors = []
107+
q.finish(err)
108+
}
109+
110+
queues.set(key, q)
64111
}
112+
113+
q.push(multiaddrs)
114+
q.finishCbs.push(callback)
65115
},
66116

67117
listen (key, options, handler, callback) {

0 commit comments

Comments
 (0)