diff --git a/package.json b/package.json index 71dc35e..e646a76 100644 --- a/package.json +++ b/package.json @@ -40,7 +40,7 @@ "npm": ">=3.0.0" }, "devDependencies": { - "aegir": "^11.0.0", + "aegir": "^11.0.1", "buffer-loader": "0.0.1", "chai": "^3.5.0", "dirty-chai": "^1.2.2", @@ -48,27 +48,27 @@ "libp2p-multiplex": "~0.4.3", "libp2p-secio": "~0.6.8", "libp2p-spdy": "~0.10.6", - "libp2p-tcp": "~0.9.4", + "libp2p-tcp": "~0.10.0", "libp2p-webrtc-star": "~0.8.10", - "libp2p-websockets": "~0.9.4", + "libp2p-websockets": "~0.10.0", "pre-commit": "^1.2.2", "pull-goodbye": "0.0.1", "pull-stream": "^3.5.0", "webrtcsupport": "^2.2.0" }, "dependencies": { - "async": "^2.1.5", + "async": "^2.2.0", "browserify-zlib-next": "^1.0.1", "debug": "^2.6.3", "interface-connection": "~0.3.2", "ip-address": "^5.8.6", "libp2p-identify": "~0.3.3", "lodash.includes": "^4.3.0", - "multiaddr": "^2.2.2", + "multiaddr": "^2.2.3", "multistream-select": "~0.13.5", "once": "^1.4.0", - "peer-id": "~0.8.4", - "peer-info": "~0.8.4", + "peer-id": "~0.8.5", + "peer-info": "~0.8.5", "protocol-buffers": "^3.2.1" }, "contributors": [ @@ -86,4 +86,4 @@ "greenkeeper[bot] ", "ᴠɪᴄᴛᴏʀ ʙᴊᴇʟᴋʜᴏʟᴍ " ] -} \ No newline at end of file +} diff --git a/src/transport.js b/src/transport.js index c7beb6d..b42eb82 100644 --- a/src/transport.js +++ b/src/transport.js @@ -2,13 +2,23 @@ const Connection = require('interface-connection').Connection const parallel = require('async/parallel') +const queue = require('async/queue') +const timeout = require('async/timeout') const once = require('once') const debug = require('debug') const log = debug('libp2p:swarm:transport') const protocolMuxer = require('./protocol-muxer') +// number of concurrent outbound dials to make per peer, same as go-libp2p-swarm +const defaultPerPeerRateLimit = 8 + +// the amount of time a single dial has to succeed +const dialTimeout = 10 * 1000 + module.exports = function (swarm) { + const queues = new Map() + return { add (key, transport, options, callback) { if (typeof options === 'function') { @@ -36,32 +46,80 @@ module.exports = function (swarm) { multiaddrs = [multiaddrs] } log('dialing %s', key, multiaddrs.map((m) => m.toString())) - // a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that) + // filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that) multiaddrs = dialables(t, multiaddrs) - // b) if multiaddrs.length = 1, return the conn from the - // transport, otherwise, create a passthrough - if (multiaddrs.length === 1) { - const conn = t.dial(multiaddrs.shift(), (err) => { - if (err) return callback(err) - callback(null, new Connection(conn)) - }) - return - } + // create dial queue if non exists + let q + if (queues.has(key)) { + log('reusing queue') + q = queues.get(key) + } else { + log('setting up new queue') + q = queue((multiaddr, cb) => { + dialWithTimeout(t, multiaddr, dialTimeout, (err, conn) => { + if (err) { + log('dial err', err) + return cb(err) + } + + if (q.canceled) { + log('dial canceled: %s', multiaddr.toString()) + // clean up already done dials + if (conn) { + conn.close() + } + return cb() + } + + // one is enough + log('dial success: %s', multiaddr.toString()) + q.kill() + q.canceled = true + + q.finish(null, conn) + }) + }, defaultPerPeerRateLimit) + + q.errors = [] + q.finishCbs = [] - // c) multiaddrs should already be a filtered list - // specific for the transport we are using - const proxyConn = new Connection() + // handle finish + q.finish = (err, conn) => { + log('queue finish') + queues.delete(key) - next(multiaddrs.shift()) + q.finishCbs.forEach((next) => { + if (err) { + return next(err) + } - // TODO improve in the future to make all the dials in paralell - function next (multiaddr) { - const conn = t.dial(multiaddr, () => { - proxyConn.setInnerConn(conn) - callback(null, proxyConn) - }) + const proxyConn = new Connection() + proxyConn.setInnerConn(conn) + + next(null, proxyConn) + }) + } + + // collect errors + q.error = (err) => { + q.errors.push(err) + } + + // no more addresses and all failed + q.drain = () => { + log('queue drain') + const err = new Error('Could not dial any address') + err.errors = q.errors + q.errors = [] + q.finish(err) + } + + queues.set(key, q) } + + q.push(multiaddrs) + q.finishCbs.push(callback) }, listen (key, options, handler, callback) { @@ -134,4 +192,13 @@ function dialables (tp, multiaddrs) { return tp.filter(multiaddrs) } +function dialWithTimeout (transport, multiaddr, maxTimeout, callback) { + timeout((cb) => { + const conn = transport.dial(multiaddr, (err) => { + log('dialed') + cb(err, conn) + }) + }, maxTimeout)(callback) +} + function noop () {} diff --git a/test/01-transport-tcp.node.js b/test/01-transport-tcp.node.js index 22f8a09..f8a5991 100644 --- a/test/01-transport-tcp.node.js +++ b/test/01-transport-tcp.node.js @@ -100,6 +100,8 @@ describe('transport - tcp', function () { it('dial to set of multiaddr, only one is available', (done) => { const conn = swarmA.transport.dial('tcp', [ multiaddr('/ip4/127.0.0.1/tcp/9910/ws'), // not valid on purpose + multiaddr('/ip4/127.0.0.1/tcp/9359'), + multiaddr('/ip4/127.0.0.1/tcp/9329'), multiaddr('/ip4/127.0.0.1/tcp/9910'), multiaddr('/ip4/127.0.0.1/tcp/9999'), multiaddr('/ip4/127.0.0.1/tcp/9309') @@ -114,6 +116,19 @@ describe('transport - tcp', function () { ) }) + it('dial to set of multiaddr, none is available', (done) => { + swarmA.transport.dial('tcp', [ + multiaddr('/ip4/127.0.0.1/tcp/9910/ws'), // not valid on purpose + multiaddr('/ip4/127.0.0.1/tcp/9359'), + multiaddr('/ip4/127.0.0.1/tcp/9329') + ], (err, conn) => { + expect(err).to.exist() + expect(err.errors).to.have.length(2) + expect(conn).to.not.exist() + done() + }) + }) + it('close', (done) => { parallel([ (cb) => swarmA.transport.close('tcp', cb), diff --git a/test/03-transport-websockets.node.js b/test/03-transport-websockets.node.js index ef7a69d..90dea10 100644 --- a/test/03-transport-websockets.node.js +++ b/test/03-transport-websockets.node.js @@ -102,6 +102,18 @@ describe('transport - websockets', function () { }) }) + it('dial to set of multiaddr, none is available', (done) => { + swarmA.transport.dial('ws', [ + multiaddr('/ip4/127.0.0.1/tcp/9320/ws'), + multiaddr('/ip4/127.0.0.1/tcp/9359/ws') + ], (err, conn) => { + expect(err).to.exist() + expect(err.errors).to.have.length(2) + expect(conn).to.not.exist() + done() + }) + }) + it('close', (done) => { parallel([ (cb) => swarmA.transport.close('ws', cb),