Skip to content
This repository was archived by the owner on Aug 23, 2019. It is now read-only.

Commit a15e63c

Browse files
dignifiedquiredaviddias
authored andcommitted
feat(transport): use parallel limited dialer (#195)
* feat(transport): use parallel limited dialer
1 parent 12dc042 commit a15e63c

File tree

4 files changed

+122
-28
lines changed

4 files changed

+122
-28
lines changed

package.json

+8-8
Original file line numberDiff line numberDiff line change
@@ -40,35 +40,35 @@
4040
"npm": ">=3.0.0"
4141
},
4242
"devDependencies": {
43-
"aegir": "^11.0.0",
43+
"aegir": "^11.0.1",
4444
"buffer-loader": "0.0.1",
4545
"chai": "^3.5.0",
4646
"dirty-chai": "^1.2.2",
4747
"gulp": "^3.9.1",
4848
"libp2p-multiplex": "~0.4.3",
4949
"libp2p-secio": "~0.6.8",
5050
"libp2p-spdy": "~0.10.6",
51-
"libp2p-tcp": "~0.9.4",
51+
"libp2p-tcp": "~0.10.0",
5252
"libp2p-webrtc-star": "~0.8.10",
53-
"libp2p-websockets": "~0.9.4",
53+
"libp2p-websockets": "~0.10.0",
5454
"pre-commit": "^1.2.2",
5555
"pull-goodbye": "0.0.1",
5656
"pull-stream": "^3.5.0",
5757
"webrtcsupport": "^2.2.0"
5858
},
5959
"dependencies": {
60-
"async": "^2.1.5",
60+
"async": "^2.2.0",
6161
"browserify-zlib-next": "^1.0.1",
6262
"debug": "^2.6.3",
6363
"interface-connection": "~0.3.2",
6464
"ip-address": "^5.8.6",
6565
"libp2p-identify": "~0.3.3",
6666
"lodash.includes": "^4.3.0",
67-
"multiaddr": "^2.2.2",
67+
"multiaddr": "^2.2.3",
6868
"multistream-select": "~0.13.5",
6969
"once": "^1.4.0",
70-
"peer-id": "~0.8.4",
71-
"peer-info": "~0.8.4",
70+
"peer-id": "~0.8.5",
71+
"peer-info": "~0.8.5",
7272
"protocol-buffers": "^3.2.1"
7373
},
7474
"contributors": [
@@ -86,4 +86,4 @@
8686
"greenkeeper[bot] <greenkeeper[bot]@users.noreply.github.com>",
8787
"ᴠɪᴄᴛᴏʀ ʙᴊᴇʟᴋʜᴏʟᴍ <[email protected]>"
8888
]
89-
}
89+
}

src/transport.js

+87-20
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,23 @@
22

33
const Connection = require('interface-connection').Connection
44
const parallel = require('async/parallel')
5+
const queue = require('async/queue')
6+
const timeout = require('async/timeout')
57
const once = require('once')
68
const debug = require('debug')
79
const log = debug('libp2p:swarm:transport')
810

911
const protocolMuxer = require('./protocol-muxer')
1012

13+
// number of concurrent outbound dials to make per peer, same as go-libp2p-swarm
14+
const defaultPerPeerRateLimit = 8
15+
16+
// the amount of time a single dial has to succeed
17+
const dialTimeout = 10 * 1000
18+
1119
module.exports = function (swarm) {
20+
const queues = new Map()
21+
1222
return {
1323
add (key, transport, options, callback) {
1424
if (typeof options === 'function') {
@@ -36,32 +46,80 @@ module.exports = function (swarm) {
3646
multiaddrs = [multiaddrs]
3747
}
3848
log('dialing %s', key, multiaddrs.map((m) => m.toString()))
39-
// a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that)
49+
// filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that)
4050
multiaddrs = dialables(t, multiaddrs)
4151

42-
// b) if multiaddrs.length = 1, return the conn from the
43-
// transport, otherwise, create a passthrough
44-
if (multiaddrs.length === 1) {
45-
const conn = t.dial(multiaddrs.shift(), (err) => {
46-
if (err) return callback(err)
47-
callback(null, new Connection(conn))
48-
})
49-
return
50-
}
52+
// create dial queue if non exists
53+
let q
54+
if (queues.has(key)) {
55+
log('reusing queue')
56+
q = queues.get(key)
57+
} else {
58+
log('setting up new queue')
59+
q = queue((multiaddr, cb) => {
60+
dialWithTimeout(t, multiaddr, dialTimeout, (err, conn) => {
61+
if (err) {
62+
log('dial err', err)
63+
return cb(err)
64+
}
65+
66+
if (q.canceled) {
67+
log('dial canceled: %s', multiaddr.toString())
68+
// clean up already done dials
69+
if (conn) {
70+
conn.close()
71+
}
72+
return cb()
73+
}
74+
75+
// one is enough
76+
log('dial success: %s', multiaddr.toString())
77+
q.kill()
78+
q.canceled = true
79+
80+
q.finish(null, conn)
81+
})
82+
}, defaultPerPeerRateLimit)
83+
84+
q.errors = []
85+
q.finishCbs = []
5186

52-
// c) multiaddrs should already be a filtered list
53-
// specific for the transport we are using
54-
const proxyConn = new Connection()
87+
// handle finish
88+
q.finish = (err, conn) => {
89+
log('queue finish')
90+
queues.delete(key)
5591

56-
next(multiaddrs.shift())
92+
q.finishCbs.forEach((next) => {
93+
if (err) {
94+
return next(err)
95+
}
5796

58-
// TODO improve in the future to make all the dials in paralell
59-
function next (multiaddr) {
60-
const conn = t.dial(multiaddr, () => {
61-
proxyConn.setInnerConn(conn)
62-
callback(null, proxyConn)
63-
})
97+
const proxyConn = new Connection()
98+
proxyConn.setInnerConn(conn)
99+
100+
next(null, proxyConn)
101+
})
102+
}
103+
104+
// collect errors
105+
q.error = (err) => {
106+
q.errors.push(err)
107+
}
108+
109+
// no more addresses and all failed
110+
q.drain = () => {
111+
log('queue drain')
112+
const err = new Error('Could not dial any address')
113+
err.errors = q.errors
114+
q.errors = []
115+
q.finish(err)
116+
}
117+
118+
queues.set(key, q)
64119
}
120+
121+
q.push(multiaddrs)
122+
q.finishCbs.push(callback)
65123
},
66124

67125
listen (key, options, handler, callback) {
@@ -134,4 +192,13 @@ function dialables (tp, multiaddrs) {
134192
return tp.filter(multiaddrs)
135193
}
136194

195+
function dialWithTimeout (transport, multiaddr, maxTimeout, callback) {
196+
timeout((cb) => {
197+
const conn = transport.dial(multiaddr, (err) => {
198+
log('dialed')
199+
cb(err, conn)
200+
})
201+
}, maxTimeout)(callback)
202+
}
203+
137204
function noop () {}

test/01-transport-tcp.node.js

+15
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ describe('transport - tcp', function () {
100100
it('dial to set of multiaddr, only one is available', (done) => {
101101
const conn = swarmA.transport.dial('tcp', [
102102
multiaddr('/ip4/127.0.0.1/tcp/9910/ws'), // not valid on purpose
103+
multiaddr('/ip4/127.0.0.1/tcp/9359'),
104+
multiaddr('/ip4/127.0.0.1/tcp/9329'),
103105
multiaddr('/ip4/127.0.0.1/tcp/9910'),
104106
multiaddr('/ip4/127.0.0.1/tcp/9999'),
105107
multiaddr('/ip4/127.0.0.1/tcp/9309')
@@ -114,6 +116,19 @@ describe('transport - tcp', function () {
114116
)
115117
})
116118

119+
it('dial to set of multiaddr, none is available', (done) => {
120+
swarmA.transport.dial('tcp', [
121+
multiaddr('/ip4/127.0.0.1/tcp/9910/ws'), // not valid on purpose
122+
multiaddr('/ip4/127.0.0.1/tcp/9359'),
123+
multiaddr('/ip4/127.0.0.1/tcp/9329')
124+
], (err, conn) => {
125+
expect(err).to.exist()
126+
expect(err.errors).to.have.length(2)
127+
expect(conn).to.not.exist()
128+
done()
129+
})
130+
})
131+
117132
it('close', (done) => {
118133
parallel([
119134
(cb) => swarmA.transport.close('tcp', cb),

test/03-transport-websockets.node.js

+12
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,18 @@ describe('transport - websockets', function () {
102102
})
103103
})
104104

105+
it('dial to set of multiaddr, none is available', (done) => {
106+
swarmA.transport.dial('ws', [
107+
multiaddr('/ip4/127.0.0.1/tcp/9320/ws'),
108+
multiaddr('/ip4/127.0.0.1/tcp/9359/ws')
109+
], (err, conn) => {
110+
expect(err).to.exist()
111+
expect(err.errors).to.have.length(2)
112+
expect(conn).to.not.exist()
113+
done()
114+
})
115+
})
116+
105117
it('close', (done) => {
106118
parallel([
107119
(cb) => swarmA.transport.close('ws', cb),

0 commit comments

Comments
 (0)