Skip to content
This repository was archived by the owner on Aug 23, 2019. It is now read-only.

feat(transport): use parallel limited dialer #195

Merged
merged 6 commits into from
Mar 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,35 @@
"npm": ">=3.0.0"
},
"devDependencies": {
"aegir": "^11.0.0",
"aegir": "^11.0.1",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
"dirty-chai": "^1.2.2",
"gulp": "^3.9.1",
"libp2p-multiplex": "~0.4.3",
"libp2p-secio": "~0.6.8",
"libp2p-spdy": "~0.10.6",
"libp2p-tcp": "~0.9.4",
"libp2p-tcp": "~0.10.0",
"libp2p-webrtc-star": "~0.8.10",
"libp2p-websockets": "~0.9.4",
"libp2p-websockets": "~0.10.0",
"pre-commit": "^1.2.2",
"pull-goodbye": "0.0.1",
"pull-stream": "^3.5.0",
"webrtcsupport": "^2.2.0"
},
"dependencies": {
"async": "^2.1.5",
"async": "^2.2.0",
"browserify-zlib-next": "^1.0.1",
"debug": "^2.6.3",
"interface-connection": "~0.3.2",
"ip-address": "^5.8.6",
"libp2p-identify": "~0.3.3",
"lodash.includes": "^4.3.0",
"multiaddr": "^2.2.2",
"multiaddr": "^2.2.3",
"multistream-select": "~0.13.5",
"once": "^1.4.0",
"peer-id": "~0.8.4",
"peer-info": "~0.8.4",
"peer-id": "~0.8.5",
"peer-info": "~0.8.5",
"protocol-buffers": "^3.2.1"
},
"contributors": [
Expand All @@ -86,4 +86,4 @@
"greenkeeper[bot] <greenkeeper[bot]@users.noreply.github.com>",
"ᴠɪᴄᴛᴏʀ ʙᴊᴇʟᴋʜᴏʟᴍ <[email protected]>"
]
}
}
107 changes: 87 additions & 20 deletions src/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,23 @@

const Connection = require('interface-connection').Connection
const parallel = require('async/parallel')
const queue = require('async/queue')
const timeout = require('async/timeout')
const once = require('once')
const debug = require('debug')
const log = debug('libp2p:swarm:transport')

const protocolMuxer = require('./protocol-muxer')

// number of concurrent outbound dials to make per peer, same as go-libp2p-swarm
const defaultPerPeerRateLimit = 8

// the amount of time a single dial has to succeed
const dialTimeout = 10 * 1000

module.exports = function (swarm) {
const queues = new Map()

return {
add (key, transport, options, callback) {
if (typeof options === 'function') {
Expand Down Expand Up @@ -36,32 +46,80 @@ module.exports = function (swarm) {
multiaddrs = [multiaddrs]
}
log('dialing %s', key, multiaddrs.map((m) => m.toString()))
// a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that)
// filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that)
multiaddrs = dialables(t, multiaddrs)

// b) if multiaddrs.length = 1, return the conn from the
// transport, otherwise, create a passthrough
if (multiaddrs.length === 1) {
const conn = t.dial(multiaddrs.shift(), (err) => {
if (err) return callback(err)
callback(null, new Connection(conn))
})
return
}
// create dial queue if non exists
let q
if (queues.has(key)) {
log('reusing queue')
q = queues.get(key)
} else {
log('setting up new queue')
q = queue((multiaddr, cb) => {
dialWithTimeout(t, multiaddr, dialTimeout, (err, conn) => {
if (err) {
log('dial err', err)
return cb(err)
}

if (q.canceled) {
log('dial canceled: %s', multiaddr.toString())
// clean up already done dials
if (conn) {
conn.close()
}
return cb()
}

// one is enough
log('dial success: %s', multiaddr.toString())
q.kill()
q.canceled = true

q.finish(null, conn)
})
}, defaultPerPeerRateLimit)

q.errors = []
q.finishCbs = []

// c) multiaddrs should already be a filtered list
// specific for the transport we are using
const proxyConn = new Connection()
// handle finish
q.finish = (err, conn) => {
log('queue finish')
queues.delete(key)

next(multiaddrs.shift())
q.finishCbs.forEach((next) => {
if (err) {
return next(err)
}

// TODO improve in the future to make all the dials in paralell
function next (multiaddr) {
const conn = t.dial(multiaddr, () => {
proxyConn.setInnerConn(conn)
callback(null, proxyConn)
})
const proxyConn = new Connection()
proxyConn.setInnerConn(conn)

next(null, proxyConn)
})
}

// collect errors
q.error = (err) => {
q.errors.push(err)
}

// no more addresses and all failed
q.drain = () => {
log('queue drain')
const err = new Error('Could not dial any address')
err.errors = q.errors
q.errors = []
q.finish(err)
}

queues.set(key, q)
}

q.push(multiaddrs)
q.finishCbs.push(callback)
},

listen (key, options, handler, callback) {
Expand Down Expand Up @@ -134,4 +192,13 @@ function dialables (tp, multiaddrs) {
return tp.filter(multiaddrs)
}

function dialWithTimeout (transport, multiaddr, maxTimeout, callback) {
timeout((cb) => {
const conn = transport.dial(multiaddr, (err) => {
log('dialed')
cb(err, conn)
})
}, maxTimeout)(callback)
}

function noop () {}
15 changes: 15 additions & 0 deletions test/01-transport-tcp.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ describe('transport - tcp', function () {
it('dial to set of multiaddr, only one is available', (done) => {
const conn = swarmA.transport.dial('tcp', [
multiaddr('/ip4/127.0.0.1/tcp/9910/ws'), // not valid on purpose
multiaddr('/ip4/127.0.0.1/tcp/9359'),
multiaddr('/ip4/127.0.0.1/tcp/9329'),
multiaddr('/ip4/127.0.0.1/tcp/9910'),
multiaddr('/ip4/127.0.0.1/tcp/9999'),
multiaddr('/ip4/127.0.0.1/tcp/9309')
Expand All @@ -114,6 +116,19 @@ describe('transport - tcp', function () {
)
})

it('dial to set of multiaddr, none is available', (done) => {
swarmA.transport.dial('tcp', [
multiaddr('/ip4/127.0.0.1/tcp/9910/ws'), // not valid on purpose
multiaddr('/ip4/127.0.0.1/tcp/9359'),
multiaddr('/ip4/127.0.0.1/tcp/9329')
], (err, conn) => {
expect(err).to.exist()
expect(err.errors).to.have.length(2)
expect(conn).to.not.exist()
done()
})
})

it('close', (done) => {
parallel([
(cb) => swarmA.transport.close('tcp', cb),
Expand Down
12 changes: 12 additions & 0 deletions test/03-transport-websockets.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ describe('transport - websockets', function () {
})
})

it('dial to set of multiaddr, none is available', (done) => {
swarmA.transport.dial('ws', [
multiaddr('/ip4/127.0.0.1/tcp/9320/ws'),
multiaddr('/ip4/127.0.0.1/tcp/9359/ws')
], (err, conn) => {
expect(err).to.exist()
expect(err.errors).to.have.length(2)
expect(conn).to.not.exist()
done()
})
})

it('close', (done) => {
parallel([
(cb) => swarmA.transport.close('ws', cb),
Expand Down