From 413f7e5da92d2f43441f4140beaa929827d49efa Mon Sep 17 00:00:00 2001 From: David Dias Date: Mon, 27 Mar 2017 20:37:26 +0100 Subject: [PATCH 1/6] wip --- src/transport.js | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/transport.js b/src/transport.js index b42eb82..7e212b9 100644 --- a/src/transport.js +++ b/src/transport.js @@ -2,6 +2,7 @@ const Connection = require('interface-connection').Connection const parallel = require('async/parallel') +const pull = require('pull-stream') const queue = require('async/queue') const timeout = require('async/timeout') const once = require('once') @@ -11,10 +12,10 @@ const log = debug('libp2p:swarm:transport') const protocolMuxer = require('./protocol-muxer') // number of concurrent outbound dials to make per peer, same as go-libp2p-swarm -const defaultPerPeerRateLimit = 8 +const defaultPerPeerRateLimit = 1 // 8, currently one to avoid https://github.com/libp2p/js-libp2p-swarm/pull/195#issuecomment-289497688 // the amount of time a single dial has to succeed -const dialTimeout = 10 * 1000 +const dialTimeout = 10 * 10000 module.exports = function (swarm) { const queues = new Map() @@ -67,7 +68,12 @@ module.exports = function (swarm) { log('dial canceled: %s', multiaddr.toString()) // clean up already done dials if (conn) { - conn.close() + pull( + pull.empty(), + conn + ) + + // conn.close() } return cb() } @@ -102,9 +108,7 @@ module.exports = function (swarm) { } // collect errors - q.error = (err) => { - q.errors.push(err) - } + q.error = (err) => q.errors.push(err) // no more addresses and all failed q.drain = () => { From 2cc218517368b7a812a9c1c1ef49cd2e3340c55e Mon Sep 17 00:00:00 2001 From: David Dias Date: Mon, 27 Mar 2017 20:41:18 +0100 Subject: [PATCH 2/6] y kill? --- src/transport.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transport.js b/src/transport.js index 7e212b9..2a12af6 100644 --- a/src/transport.js +++ b/src/transport.js @@ -80,7 +80,7 @@ module.exports = function (swarm) { // one is enough log('dial success: %s', multiaddr.toString()) - q.kill() + q.kill() // why kill here? q.canceled = true q.finish(null, conn) From 5f1b223407bb8c93718798460a3ff92094a17286 Mon Sep 17 00:00:00 2001 From: David Dias Date: Mon, 27 Mar 2017 20:45:44 +0100 Subject: [PATCH 3/6] fix: do not reuse queues per transport --- src/transport.js | 112 +++++++++++++++++++++-------------------------- 1 file changed, 50 insertions(+), 62 deletions(-) diff --git a/src/transport.js b/src/transport.js index 2a12af6..363d74c 100644 --- a/src/transport.js +++ b/src/transport.js @@ -15,11 +15,9 @@ const protocolMuxer = require('./protocol-muxer') const defaultPerPeerRateLimit = 1 // 8, currently one to avoid https://github.com/libp2p/js-libp2p-swarm/pull/195#issuecomment-289497688 // the amount of time a single dial has to succeed -const dialTimeout = 10 * 10000 +const dialTimeout = 10 * 1000 module.exports = function (swarm) { - const queues = new Map() - return { add (key, transport, options, callback) { if (typeof options === 'function') { @@ -51,75 +49,65 @@ module.exports = function (swarm) { multiaddrs = dialables(t, multiaddrs) // create dial queue if non exists - let q - if (queues.has(key)) { - log('reusing queue') - q = queues.get(key) - } else { - log('setting up new queue') - q = queue((multiaddr, cb) => { - dialWithTimeout(t, multiaddr, dialTimeout, (err, conn) => { - if (err) { - log('dial err', err) - return cb(err) - } - - if (q.canceled) { - log('dial canceled: %s', multiaddr.toString()) - // clean up already done dials - if (conn) { - pull( - pull.empty(), - conn - ) - - // conn.close() - } - return cb() + let q = queue((multiaddr, cb) => { + dialWithTimeout(t, multiaddr, dialTimeout, (err, conn) => { + if (err) { + log('dial err', err) + return cb(err) + } + + if (q.canceled) { + log('dial canceled: %s', multiaddr.toString()) + // clean up already done dials + if (conn) { + pull( + pull.empty(), + conn + ) + + // conn.close() } + return cb() + } - // one is enough - log('dial success: %s', multiaddr.toString()) - q.kill() // why kill here? - q.canceled = true - - q.finish(null, conn) - }) - }, defaultPerPeerRateLimit) + // one is enough + log('dial success: %s', multiaddr.toString()) + q.kill() + q.canceled = true - q.errors = [] - q.finishCbs = [] + q.finish(null, conn) + }) + }, defaultPerPeerRateLimit) - // handle finish - q.finish = (err, conn) => { - log('queue finish') - queues.delete(key) + q.errors = [] + q.finishCbs = [] - q.finishCbs.forEach((next) => { - if (err) { - return next(err) - } + // handle finish + q.finish = (err, conn) => { + log('queue finish') - const proxyConn = new Connection() - proxyConn.setInnerConn(conn) + q.finishCbs.forEach((next) => { + if (err) { + return next(err) + } - next(null, proxyConn) - }) - } + const proxyConn = new Connection() + proxyConn.setInnerConn(conn) - // collect errors - q.error = (err) => q.errors.push(err) + next(null, proxyConn) + }) + } - // no more addresses and all failed - q.drain = () => { - log('queue drain') - const err = new Error('Could not dial any address') - err.errors = q.errors - q.errors = [] - q.finish(err) - } + // collect errors + q.error = (err) => q.errors.push(err) - queues.set(key, q) + // no more addresses and all failed + q.drain = () => { + log('queue drain') + const err = new Error('Could not dial any address') + err.errors = q.errors + q.errors = [] + q.finish(err) } q.push(multiaddrs) From e8a22266cf5b92576dad809da0b72663587da86b Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Tue, 28 Mar 2017 00:02:28 +0200 Subject: [PATCH 4/6] implement dialer limiting properly --- src/dial.js | 3 +- src/dialer.js | 168 +++++++++++++++++++++++ src/transport.js | 86 +----------- test/01-transport-tcp.node.js | 36 +++-- test/03-transport-websockets.node.js | 29 ++-- test/browser-00-transport-websockets.js | 16 ++- test/browser-01-transport-webrtc-star.js | 9 +- test/dialer.spec.js | 99 +++++++++++++ 8 files changed, 332 insertions(+), 114 deletions(-) create mode 100644 src/dialer.js create mode 100644 test/dialer.spec.js diff --git a/src/dial.js b/src/dial.js index 5a2d9aa..a965cbe 100644 --- a/src/dial.js +++ b/src/dial.js @@ -87,8 +87,7 @@ module.exports = function dial (swarm) { nextTransport(tKeys.shift()) function nextTransport (key) { - const multiaddrs = pi.multiaddrs.slice() - swarm.transport.dial(key, multiaddrs, (err, conn) => { + swarm.transport.dial(key, pi, (err, conn) => { if (err) { if (tKeys.length === 0) { return cb(new Error('Could not dial in any of the transports')) diff --git a/src/dialer.js b/src/dialer.js new file mode 100644 index 0000000..99ac7be --- /dev/null +++ b/src/dialer.js @@ -0,0 +1,168 @@ +'use strict' + +const Connection = require('interface-connection').Connection +const queue = require('async/queue') +const map = require('async/map') +const timeout = require('async/timeout') +const pull = require('pull-stream') + +/** + * Track dials per peer and limited them. + */ +class Dialer { + /** + * Create a new dialer. + * + * @param {number} perPeerLimit + * @param {number} dialTimeout + */ + constructor (perPeerLimit, dialTimeout) { + this.perPeerLimit = perPeerLimit + this.dialTimeout = dialTimeout + this.queues = new Map() + } + + /** + * Dial a list of multiaddrs on the given transport. + * + * @param {PeerId} peer + * @param {SwarmTransport} transport + * @param {Array} addrs + * @param {function(Error, Connection)} callback + * @returns {void} + */ + dialMany (peer, transport, addrs, callback) { + // we use a token to track if we want to cancel following dials + const token = {cancel: false} + map(addrs, (m, cb) => { + this.dialSingle(peer, transport, m, token, cb) + }, (err, results) => { + if (err) { + return callback(err) + } + + const success = results.filter((res) => res.conn) + if (success.length > 0) { + return callback(null, success[0].conn) + } + + const error = new Error('Failed to dial any provided address') + error.errors = results + .filter((res) => res.error) + .map((res) => res.error) + return callback(error) + }) + } + + /** + * Dial a single multiaddr on the given transport. + * + * @param {PeerId} peer + * @param {SwarmTransport} transport + * @param {Multiaddr} addr + * @param {CancelToken} token + * @param {function(Error, Connection)} callback + * @returns {void} + */ + dialSingle (peer, transport, addr, token, callback) { + const ps = peer.toB58String() + let q + if (this.queues.has(ps)) { + q = this.queues.get(ps) + } else { + q = new DialQueue(this.perPeerLimit, this.dialTimeout) + this.queues.set(ps, q) + } + + q.push(transport, addr, token, callback) + } +} + +/** + * Queue up the amount of dials to a given peer. + */ +class DialQueue { + /** + * Create a new dial queue. + * + * @param {number} limit + * @param {number} dialTimeout + */ + constructor (limit, dialTimeout) { + this.dialTimeout = dialTimeout + + this.queue = queue((task, cb) => { + this._doWork(task.transport, task.addr, task.token, cb) + }, limit) + } + + /** + * The actual work done by the queue. + * + * @param {SwarmTransport} transport + * @param {Multiaddr} addr + * @param {CancelToken} token + * @param {function(Error, Connection)} callback + * @returns {void} + * @private + */ + _doWork (transport, addr, token, callback) { + this._dialWithTimeout( + transport, + addr, + (err, conn) => { + if (err) { + return callback(null, {error: err}) + } + + if (token.cancel) { + // clean up already done dials + pull(pull.empty(), conn) + // TODO: proper cleanup once the connection interface supports it + // return conn.close(() => callback(new Error('Manual cancel')) + return callback(null, {cancel: true}) + } + + // one is enough + token.cancel = true + + const proxyConn = new Connection() + proxyConn.setInnerConn(conn) + callback(null, {conn}) + } + ) + } + + /** + * Dial the given transport, timing out with the set timeout. + * + * @param {SwarmTransport} transport + * @param {Multiaddr} addr + * @param {function(Error, Connection)} callback + * @returns {void} + * + * @private + */ + _dialWithTimeout (transport, addr, callback) { + timeout((cb) => { + const conn = transport.dial(addr, (err) => { + cb(err, conn) + }) + }, this.dialTimeout)(callback) + } + + /** + * Add new work to the queue. + * + * @param {SwarmTransport} transport + * @param {Multiaddr} addr + * @param {CancelToken} token + * @param {function(Error, Connection)} callback + * @returns {void} + */ + push (transport, addr, token, callback) { + this.queue.push({transport, addr, token}, callback) + } +} + +module.exports = Dialer diff --git a/src/transport.js b/src/transport.js index 363d74c..efa3025 100644 --- a/src/transport.js +++ b/src/transport.js @@ -1,23 +1,22 @@ 'use strict' -const Connection = require('interface-connection').Connection const parallel = require('async/parallel') -const pull = require('pull-stream') -const queue = require('async/queue') -const timeout = require('async/timeout') const once = require('once') const debug = require('debug') const log = debug('libp2p:swarm:transport') const protocolMuxer = require('./protocol-muxer') +const Dialer = require('./dialer') // number of concurrent outbound dials to make per peer, same as go-libp2p-swarm -const defaultPerPeerRateLimit = 1 // 8, currently one to avoid https://github.com/libp2p/js-libp2p-swarm/pull/195#issuecomment-289497688 +const defaultPerPeerRateLimit = 8 // the amount of time a single dial has to succeed const dialTimeout = 10 * 1000 module.exports = function (swarm) { + const dialer = new Dialer(defaultPerPeerRateLimit, dialTimeout) + return { add (key, transport, options, callback) { if (typeof options === 'function') { @@ -38,8 +37,9 @@ module.exports = function (swarm) { callback() }, - dial (key, multiaddrs, callback) { + dial (key, pi, callback) { const t = swarm.transports[key] + let multiaddrs = pi.multiaddrs.slice() if (!Array.isArray(multiaddrs)) { multiaddrs = [multiaddrs] @@ -48,70 +48,7 @@ module.exports = function (swarm) { // filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that) multiaddrs = dialables(t, multiaddrs) - // create dial queue if non exists - let q = queue((multiaddr, cb) => { - dialWithTimeout(t, multiaddr, dialTimeout, (err, conn) => { - if (err) { - log('dial err', err) - return cb(err) - } - - if (q.canceled) { - log('dial canceled: %s', multiaddr.toString()) - // clean up already done dials - if (conn) { - pull( - pull.empty(), - conn - ) - - // conn.close() - } - return cb() - } - - // one is enough - log('dial success: %s', multiaddr.toString()) - q.kill() - q.canceled = true - - q.finish(null, conn) - }) - }, defaultPerPeerRateLimit) - - q.errors = [] - q.finishCbs = [] - - // handle finish - q.finish = (err, conn) => { - log('queue finish') - - q.finishCbs.forEach((next) => { - if (err) { - return next(err) - } - - const proxyConn = new Connection() - proxyConn.setInnerConn(conn) - - next(null, proxyConn) - }) - } - - // collect errors - q.error = (err) => q.errors.push(err) - - // no more addresses and all failed - q.drain = () => { - log('queue drain') - const err = new Error('Could not dial any address') - err.errors = q.errors - q.errors = [] - q.finish(err) - } - - q.push(multiaddrs) - q.finishCbs.push(callback) + dialer.dialMany(pi.id, t, multiaddrs, callback) }, listen (key, options, handler, callback) { @@ -184,13 +121,4 @@ function dialables (tp, multiaddrs) { return tp.filter(multiaddrs) } -function dialWithTimeout (transport, multiaddr, maxTimeout, callback) { - timeout((cb) => { - const conn = transport.dial(multiaddr, (err) => { - log('dialed') - cb(err, conn) - }) - }, maxTimeout)(callback) -} - function noop () {} diff --git a/test/01-transport-tcp.node.js b/test/01-transport-tcp.node.js index f8a5991..ba53649 100644 --- a/test/01-transport-tcp.node.js +++ b/test/01-transport-tcp.node.js @@ -19,14 +19,16 @@ describe('transport - tcp', function () { let swarmB let peerA let peerB + let peerC before((done) => { - utils.createInfos(2, (err, infos) => { + utils.createInfos(3, (err, infos) => { if (err) { return done(err) } peerA = infos[0] peerB = infos[1] + peerC = infos[2] peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9888')) peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999')) @@ -86,7 +88,8 @@ describe('transport - tcp', function () { }) it('dial to a multiaddr', (done) => { - const conn = swarmA.transport.dial('tcp', multiaddr('/ip4/127.0.0.1/tcp/9999'), (err, conn) => { + peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999')) + const conn = swarmA.transport.dial('tcp', peerC, (err, conn) => { expect(err).to.not.exist() }) @@ -98,14 +101,16 @@ describe('transport - tcp', function () { }) it('dial to set of multiaddr, only one is available', (done) => { - const conn = swarmA.transport.dial('tcp', [ - multiaddr('/ip4/127.0.0.1/tcp/9910/ws'), // not valid on purpose - multiaddr('/ip4/127.0.0.1/tcp/9359'), - multiaddr('/ip4/127.0.0.1/tcp/9329'), - multiaddr('/ip4/127.0.0.1/tcp/9910'), - multiaddr('/ip4/127.0.0.1/tcp/9999'), - multiaddr('/ip4/127.0.0.1/tcp/9309') - ], (err, conn) => { + peerC.multiaddrs = [] + + peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9910/ws')) // not valid on purpose + peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9359')) + peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9329')) + peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9910')) + peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999')) + peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9309')) + + const conn = swarmA.transport.dial('tcp', peerC, (err, conn) => { expect(err).to.not.exist() }) @@ -117,11 +122,12 @@ describe('transport - tcp', function () { }) it('dial to set of multiaddr, none is available', (done) => { - swarmA.transport.dial('tcp', [ - multiaddr('/ip4/127.0.0.1/tcp/9910/ws'), // not valid on purpose - multiaddr('/ip4/127.0.0.1/tcp/9359'), - multiaddr('/ip4/127.0.0.1/tcp/9329') - ], (err, conn) => { + peerC.multiaddrs = [] + peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9910/ws')) // not valid on purpose + peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9359')) + peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9329')) + + swarmA.transport.dial('tcp', peerC, (err, conn) => { expect(err).to.exist() expect(err.errors).to.have.length(2) expect(conn).to.not.exist() diff --git a/test/03-transport-websockets.node.js b/test/03-transport-websockets.node.js index 90dea10..a97b89b 100644 --- a/test/03-transport-websockets.node.js +++ b/test/03-transport-websockets.node.js @@ -15,18 +15,20 @@ const utils = require('./utils') const Swarm = require('../src') describe('transport - websockets', function () { - var swarmA - var swarmB - var peerA - var peerB + let swarmA + let swarmB + let peerA + let peerB + let peerC before((done) => { - utils.createInfos(2, (err, infos) => { + utils.createInfos(3, (err, infos) => { if (err) { return done(err) } peerA = infos[0] peerB = infos[1] + peerC = infos[2] peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9888/ws')) peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999/ws/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC')) @@ -71,7 +73,9 @@ describe('transport - websockets', function () { }) it('dial', (done) => { - const conn = swarmA.transport.dial('ws', multiaddr('/ip4/127.0.0.1/tcp/9999/ws'), (err, conn) => { + peerC.multiadddrs = [] + peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999/ws')) + const conn = swarmA.transport.dial('ws', peerC, (err, conn) => { expect(err).to.not.exist() }) @@ -87,7 +91,9 @@ describe('transport - websockets', function () { }) it('dial (conn from callback)', (done) => { - swarmA.transport.dial('ws', multiaddr('/ip4/127.0.0.1/tcp/9999/ws'), (err, conn) => { + peerC.multiadddrs = [] + peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999/ws')) + swarmA.transport.dial('ws', peerC, (err, conn) => { expect(err).to.not.exist() const s = goodbye({ @@ -103,10 +109,11 @@ describe('transport - websockets', function () { }) it('dial to set of multiaddr, none is available', (done) => { - swarmA.transport.dial('ws', [ - multiaddr('/ip4/127.0.0.1/tcp/9320/ws'), - multiaddr('/ip4/127.0.0.1/tcp/9359/ws') - ], (err, conn) => { + peerC.multiadddrs = [] + peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9320/ws')) + peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9359/ws')) + + swarmA.transport.dial('ws', peerC, (err, conn) => { expect(err).to.exist() expect(err.errors).to.have.length(2) expect(conn).to.not.exist() diff --git a/test/browser-00-transport-websockets.js b/test/browser-00-transport-websockets.js index 339bb55..592dff2 100644 --- a/test/browser-00-transport-websockets.js +++ b/test/browser-00-transport-websockets.js @@ -16,13 +16,22 @@ const Swarm = require('../src') describe('transport - websockets', () => { let swarm + let peer - before(() => { + before((done) => { const b58IdSrc = 'QmYzgdesgjdvD3okTPGZT9NPmh1BuH5FfTVNKjsvaAprhb' // use a pre generated Id to save time const idSrc = Id.createFromB58String(b58IdSrc) const peerSrc = Peer(idSrc) swarm = new Swarm(peerSrc) + + Peer.create((err, p) => { + if (err) { + return done(err) + } + peer = p + done() + }) }) it('add', (done) => { @@ -33,9 +42,10 @@ describe('transport - websockets', () => { }) it('dial', (done) => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9100/ws') + peer.multiaddrs = [] + peer.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9100/ws')) - const conn = swarm.transport.dial('ws', ma, (err, conn) => { + const conn = swarm.transport.dial('ws', peer, (err, conn) => { expect(err).to.not.exist() }) diff --git a/test/browser-01-transport-webrtc-star.js b/test/browser-01-transport-webrtc-star.js index 7bf93d5..5327373 100644 --- a/test/browser-01-transport-webrtc-star.js +++ b/test/browser-01-transport-webrtc-star.js @@ -63,7 +63,7 @@ describe('transport - webrtc-star', () => { }) it('dial', (done) => { - swarm1.transport.dial('wstar', peer2.multiaddrs[0], (err, conn) => { + swarm1.transport.dial('wstar', peer2, (err, conn) => { expect(err).to.not.exist() const text = 'Hello World' @@ -78,9 +78,10 @@ describe('transport - webrtc-star', () => { ) }) }) - it('dial offline / non-exist()ent node', (done) => { - const mhOffline = multiaddr('/libp2p-webrtc-star/ip4/127.0.0.1/tcp/15555/ws/ipfs/ABCD') - swarm1.transport.dial('wstar', mhOffline, (err, conn) => { + it('dial offline / non-existent node', (done) => { + peer2.multiaddrs = [] + peer2.multiaddr.add(multiaddr('/libp2p-webrtc-star/ip4/127.0.0.1/tcp/15555/ws/ipfs/ABCD')) + swarm1.transport.dial('wstar', peer2, (err, conn) => { expect(err).to.exist() expect(conn).to.not.exist() done() diff --git a/test/dialer.spec.js b/test/dialer.spec.js new file mode 100644 index 0000000..ab97bf4 --- /dev/null +++ b/test/dialer.spec.js @@ -0,0 +1,99 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const multiaddr = require('multiaddr') +const pull = require('pull-stream') +const setImmediate = require('async/setImmediate') + +const Dialer = require('../src/dialer') +const utils = require('./utils') + +describe('Dialer', () => { + let peers + + before((done) => { + utils.createInfos(5, (err, infos) => { + if (err) { + return done(err) + } + peers = infos + + peers.forEach((peer, i) => { + peer.multiaddr.add( + multiaddr(`/ip4/191.0.0.1/tcp/123${i}`) + ) + peer.multiaddr.add( + multiaddr(`/ip4/192.168.0.1/tcp/923${i}`) + ) + peer.multiaddr.add( + multiaddr(`/ip4/193.168.0.99/tcp/923${i}`) + ) + }) + done() + }) + }) + + it('all failing', (done) => { + const dialer = new Dialer(2, 10) + + // mock transport + const t1 = { + dial (addr, cb) { + setTimeout(() => { + cb(new Error('fail')) + }, 1) + return {} + } + } + + dialer.dialMany(peers[0].id, t1, peers[0].multiaddrs, (err, conn) => { + expect(err).to.exist() + expect(err.errors).to.have.length(3) + expect(err.errors[0].message).to.eql('fail') + expect(conn).to.not.exist() + done() + }) + }) + + it('two success', (done) => { + const dialer = new Dialer(2, 10) + + // mock transport + const t1 = { + dial (addr, cb) { + const as = addr.toString() + if (as.match(/191/)) { + setImmediate(() => cb(new Error('fail'))) + return {} + } else if (as.match(/192/)) { + setTimeout(cb, 2) + return { + source: pull.values([1]), + sink: pull.drain() + } + } else if (as.match(/193/)) { + setTimeout(cb, 8) + return { + source: pull.values([2]), + sink: pull.drain() + } + } + } + } + + dialer.dialMany(peers[0].id, t1, peers[0].multiaddrs, (err, conn) => { + expect(err).to.not.exist() + pull( + conn, + pull.collect((err, res) => { + expect(err).to.not.exist() + expect(res).to.be.eql([1]) + done() + }) + ) + }) + }) +}) From 8babbfbcd6919f4672946ca63f6053b7a649e4e5 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Tue, 28 Mar 2017 11:45:27 +0200 Subject: [PATCH 5/6] test: do not reuse dialing peers --- src/dialer.js | 19 +++++++++++++- test/01-transport-tcp.node.js | 37 +++++++++++++--------------- test/03-transport-websockets.node.js | 23 ++++++++--------- 3 files changed, 45 insertions(+), 34 deletions(-) diff --git a/src/dialer.js b/src/dialer.js index 99ac7be..f5e319a 100644 --- a/src/dialer.js +++ b/src/dialer.js @@ -5,6 +5,9 @@ const queue = require('async/queue') const map = require('async/map') const timeout = require('async/timeout') const pull = require('pull-stream') +const debug = require('debug') + +const log = debug('libp2p:swarm:dialer') /** * Track dials per peer and limited them. @@ -17,6 +20,7 @@ class Dialer { * @param {number} dialTimeout */ constructor (perPeerLimit, dialTimeout) { + log('create: %s peer limit, %s dial timeout', perPeerLimit, dialTimeout) this.perPeerLimit = perPeerLimit this.dialTimeout = dialTimeout this.queues = new Map() @@ -32,6 +36,7 @@ class Dialer { * @returns {void} */ dialMany (peer, transport, addrs, callback) { + log('dialMany:start') // we use a token to track if we want to cancel following dials const token = {cancel: false} map(addrs, (m, cb) => { @@ -43,9 +48,11 @@ class Dialer { const success = results.filter((res) => res.conn) if (success.length > 0) { + log('dialMany:success') return callback(null, success[0].conn) } + log('dialMany:error') const error = new Error('Failed to dial any provided address') error.errors = results .filter((res) => res.error) @@ -66,6 +73,7 @@ class Dialer { */ dialSingle (peer, transport, addr, token, callback) { const ps = peer.toB58String() + log('dialSingle: %s:%s', ps, addr.toString()) let q if (this.queues.has(ps)) { q = this.queues.get(ps) @@ -107,15 +115,18 @@ class DialQueue { * @private */ _doWork (transport, addr, token, callback) { + log('dialQueue:work') this._dialWithTimeout( transport, addr, (err, conn) => { if (err) { + log('dialQueue:work:error') return callback(null, {error: err}) } if (token.cancel) { + log('dialQueue:work:cancel') // clean up already done dials pull(pull.empty(), conn) // TODO: proper cleanup once the connection interface supports it @@ -126,6 +137,8 @@ class DialQueue { // one is enough token.cancel = true + log('dialQueue:work:success') + const proxyConn = new Connection() proxyConn.setInnerConn(conn) callback(null, {conn}) @@ -146,7 +159,11 @@ class DialQueue { _dialWithTimeout (transport, addr, callback) { timeout((cb) => { const conn = transport.dial(addr, (err) => { - cb(err, conn) + if (err) { + return cb(err) + } + + cb(null, conn) }) }, this.dialTimeout)(callback) } diff --git a/test/01-transport-tcp.node.js b/test/01-transport-tcp.node.js index ba53649..e928fa3 100644 --- a/test/01-transport-tcp.node.js +++ b/test/01-transport-tcp.node.js @@ -19,16 +19,16 @@ describe('transport - tcp', function () { let swarmB let peerA let peerB - let peerC + let dialPeers before((done) => { - utils.createInfos(3, (err, infos) => { + utils.createInfos(5, (err, infos) => { if (err) { return done(err) } peerA = infos[0] peerB = infos[1] - peerC = infos[2] + dialPeers = infos.slice(2) peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9888')) peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999')) @@ -88,8 +88,8 @@ describe('transport - tcp', function () { }) it('dial to a multiaddr', (done) => { - peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999')) - const conn = swarmA.transport.dial('tcp', peerC, (err, conn) => { + dialPeers[0].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999')) + const conn = swarmA.transport.dial('tcp', dialPeers[0], (err, conn) => { expect(err).to.not.exist() }) @@ -101,16 +101,14 @@ describe('transport - tcp', function () { }) it('dial to set of multiaddr, only one is available', (done) => { - peerC.multiaddrs = [] - - peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9910/ws')) // not valid on purpose - peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9359')) - peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9329')) - peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9910')) - peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999')) - peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9309')) - - const conn = swarmA.transport.dial('tcp', peerC, (err, conn) => { + dialPeers[1].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9910/ws')) // not valid on purpose + dialPeers[1].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9359')) + dialPeers[1].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9329')) + dialPeers[1].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9910')) + dialPeers[1].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999')) + dialPeers[1].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9309')) + + const conn = swarmA.transport.dial('tcp', dialPeers[1], (err, conn) => { expect(err).to.not.exist() }) @@ -122,12 +120,11 @@ describe('transport - tcp', function () { }) it('dial to set of multiaddr, none is available', (done) => { - peerC.multiaddrs = [] - peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9910/ws')) // not valid on purpose - peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9359')) - peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9329')) + dialPeers[2].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9910/ws')) // not valid on purpose + dialPeers[2].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9359')) + dialPeers[2].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9329')) - swarmA.transport.dial('tcp', peerC, (err, conn) => { + swarmA.transport.dial('tcp', dialPeers[2], (err, conn) => { expect(err).to.exist() expect(err.errors).to.have.length(2) expect(conn).to.not.exist() diff --git a/test/03-transport-websockets.node.js b/test/03-transport-websockets.node.js index a97b89b..13732bc 100644 --- a/test/03-transport-websockets.node.js +++ b/test/03-transport-websockets.node.js @@ -19,16 +19,16 @@ describe('transport - websockets', function () { let swarmB let peerA let peerB - let peerC + let dialPeers before((done) => { - utils.createInfos(3, (err, infos) => { + utils.createInfos(5, (err, infos) => { if (err) { return done(err) } peerA = infos[0] peerB = infos[1] - peerC = infos[2] + dialPeers = infos.slice(2) peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9888/ws')) peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999/ws/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC')) @@ -73,9 +73,8 @@ describe('transport - websockets', function () { }) it('dial', (done) => { - peerC.multiadddrs = [] - peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999/ws')) - const conn = swarmA.transport.dial('ws', peerC, (err, conn) => { + dialPeers[0].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999/ws')) + const conn = swarmA.transport.dial('ws', dialPeers[0], (err, conn) => { expect(err).to.not.exist() }) @@ -91,9 +90,8 @@ describe('transport - websockets', function () { }) it('dial (conn from callback)', (done) => { - peerC.multiadddrs = [] - peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999/ws')) - swarmA.transport.dial('ws', peerC, (err, conn) => { + dialPeers[1].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999/ws')) + swarmA.transport.dial('ws', dialPeers[1], (err, conn) => { expect(err).to.not.exist() const s = goodbye({ @@ -109,11 +107,10 @@ describe('transport - websockets', function () { }) it('dial to set of multiaddr, none is available', (done) => { - peerC.multiadddrs = [] - peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9320/ws')) - peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9359/ws')) + dialPeers[2].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9320/ws')) + dialPeers[2].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9359/ws')) - swarmA.transport.dial('ws', peerC, (err, conn) => { + swarmA.transport.dial('ws', dialPeers[2], (err, conn) => { expect(err).to.exist() expect(err.errors).to.have.length(2) expect(conn).to.not.exist() From 1b283374adfc759f6172f58cab8b5b024d402708 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Tue, 28 Mar 2017 12:56:33 +0200 Subject: [PATCH 6/6] refactor: be less like Java --- src/limit-dialer/index.js | 87 +++++++++++++++++ src/{dialer.js => limit-dialer/queue.js} | 94 ++----------------- src/transport.js | 4 +- test/{dialer.spec.js => limit-dialer.spec.js} | 8 +- 4 files changed, 101 insertions(+), 92 deletions(-) create mode 100644 src/limit-dialer/index.js rename src/{dialer.js => limit-dialer/queue.js} (50%) rename test/{dialer.spec.js => limit-dialer.spec.js} (92%) diff --git a/src/limit-dialer/index.js b/src/limit-dialer/index.js new file mode 100644 index 0000000..ef3dd73 --- /dev/null +++ b/src/limit-dialer/index.js @@ -0,0 +1,87 @@ +'use strict' + +const map = require('async/map') +const debug = require('debug') + +const log = debug('libp2p:swarm:dialer') + +const DialQueue = require('./queue') + +/** + * Track dials per peer and limited them. + */ +class LimitDialer { + /** + * Create a new dialer. + * + * @param {number} perPeerLimit + * @param {number} dialTimeout + */ + constructor (perPeerLimit, dialTimeout) { + log('create: %s peer limit, %s dial timeout', perPeerLimit, dialTimeout) + this.perPeerLimit = perPeerLimit + this.dialTimeout = dialTimeout + this.queues = new Map() + } + + /** + * Dial a list of multiaddrs on the given transport. + * + * @param {PeerId} peer + * @param {SwarmTransport} transport + * @param {Array} addrs + * @param {function(Error, Connection)} callback + * @returns {void} + */ + dialMany (peer, transport, addrs, callback) { + log('dialMany:start') + // we use a token to track if we want to cancel following dials + const token = {cancel: false} + map(addrs, (m, cb) => { + this.dialSingle(peer, transport, m, token, cb) + }, (err, results) => { + if (err) { + return callback(err) + } + + const success = results.filter((res) => res.conn) + if (success.length > 0) { + log('dialMany:success') + return callback(null, success[0].conn) + } + + log('dialMany:error') + const error = new Error('Failed to dial any provided address') + error.errors = results + .filter((res) => res.error) + .map((res) => res.error) + return callback(error) + }) + } + + /** + * Dial a single multiaddr on the given transport. + * + * @param {PeerId} peer + * @param {SwarmTransport} transport + * @param {Multiaddr} addr + * @param {CancelToken} token + * @param {function(Error, Connection)} callback + * @returns {void} + */ + dialSingle (peer, transport, addr, token, callback) { + const ps = peer.toB58String() + log('dialSingle: %s:%s', ps, addr.toString()) + let q + if (this.queues.has(ps)) { + q = this.queues.get(ps) + } else { + q = new DialQueue(this.perPeerLimit, this.dialTimeout) + this.queues.set(ps, q) + } + + q.push(transport, addr, token, callback) + } +} + +module.exports = LimitDialer diff --git a/src/dialer.js b/src/limit-dialer/queue.js similarity index 50% rename from src/dialer.js rename to src/limit-dialer/queue.js index f5e319a..b8d70b6 100644 --- a/src/dialer.js +++ b/src/limit-dialer/queue.js @@ -1,90 +1,12 @@ 'use strict' const Connection = require('interface-connection').Connection -const queue = require('async/queue') -const map = require('async/map') -const timeout = require('async/timeout') const pull = require('pull-stream') +const timeout = require('async/timeout') +const queue = require('async/queue') const debug = require('debug') -const log = debug('libp2p:swarm:dialer') - -/** - * Track dials per peer and limited them. - */ -class Dialer { - /** - * Create a new dialer. - * - * @param {number} perPeerLimit - * @param {number} dialTimeout - */ - constructor (perPeerLimit, dialTimeout) { - log('create: %s peer limit, %s dial timeout', perPeerLimit, dialTimeout) - this.perPeerLimit = perPeerLimit - this.dialTimeout = dialTimeout - this.queues = new Map() - } - - /** - * Dial a list of multiaddrs on the given transport. - * - * @param {PeerId} peer - * @param {SwarmTransport} transport - * @param {Array} addrs - * @param {function(Error, Connection)} callback - * @returns {void} - */ - dialMany (peer, transport, addrs, callback) { - log('dialMany:start') - // we use a token to track if we want to cancel following dials - const token = {cancel: false} - map(addrs, (m, cb) => { - this.dialSingle(peer, transport, m, token, cb) - }, (err, results) => { - if (err) { - return callback(err) - } - - const success = results.filter((res) => res.conn) - if (success.length > 0) { - log('dialMany:success') - return callback(null, success[0].conn) - } - - log('dialMany:error') - const error = new Error('Failed to dial any provided address') - error.errors = results - .filter((res) => res.error) - .map((res) => res.error) - return callback(error) - }) - } - - /** - * Dial a single multiaddr on the given transport. - * - * @param {PeerId} peer - * @param {SwarmTransport} transport - * @param {Multiaddr} addr - * @param {CancelToken} token - * @param {function(Error, Connection)} callback - * @returns {void} - */ - dialSingle (peer, transport, addr, token, callback) { - const ps = peer.toB58String() - log('dialSingle: %s:%s', ps, addr.toString()) - let q - if (this.queues.has(ps)) { - q = this.queues.get(ps) - } else { - q = new DialQueue(this.perPeerLimit, this.dialTimeout) - this.queues.set(ps, q) - } - - q.push(transport, addr, token, callback) - } -} +const log = debug('libp2p:swarm:dialer:queue') /** * Queue up the amount of dials to a given peer. @@ -115,18 +37,18 @@ class DialQueue { * @private */ _doWork (transport, addr, token, callback) { - log('dialQueue:work') + log('work') this._dialWithTimeout( transport, addr, (err, conn) => { if (err) { - log('dialQueue:work:error') + log('work:error') return callback(null, {error: err}) } if (token.cancel) { - log('dialQueue:work:cancel') + log('work:cancel') // clean up already done dials pull(pull.empty(), conn) // TODO: proper cleanup once the connection interface supports it @@ -137,7 +59,7 @@ class DialQueue { // one is enough token.cancel = true - log('dialQueue:work:success') + log('work:success') const proxyConn = new Connection() proxyConn.setInnerConn(conn) @@ -182,4 +104,4 @@ class DialQueue { } } -module.exports = Dialer +module.exports = DialQueue diff --git a/src/transport.js b/src/transport.js index efa3025..d9ba45a 100644 --- a/src/transport.js +++ b/src/transport.js @@ -6,7 +6,7 @@ const debug = require('debug') const log = debug('libp2p:swarm:transport') const protocolMuxer = require('./protocol-muxer') -const Dialer = require('./dialer') +const LimitDialer = require('./limit-dialer') // number of concurrent outbound dials to make per peer, same as go-libp2p-swarm const defaultPerPeerRateLimit = 8 @@ -15,7 +15,7 @@ const defaultPerPeerRateLimit = 8 const dialTimeout = 10 * 1000 module.exports = function (swarm) { - const dialer = new Dialer(defaultPerPeerRateLimit, dialTimeout) + const dialer = new LimitDialer(defaultPerPeerRateLimit, dialTimeout) return { add (key, transport, options, callback) { diff --git a/test/dialer.spec.js b/test/limit-dialer.spec.js similarity index 92% rename from test/dialer.spec.js rename to test/limit-dialer.spec.js index ab97bf4..9613c92 100644 --- a/test/dialer.spec.js +++ b/test/limit-dialer.spec.js @@ -8,10 +8,10 @@ const multiaddr = require('multiaddr') const pull = require('pull-stream') const setImmediate = require('async/setImmediate') -const Dialer = require('../src/dialer') +const LimitDialer = require('../src/limit-dialer') const utils = require('./utils') -describe('Dialer', () => { +describe('LimitDialer', () => { let peers before((done) => { @@ -37,7 +37,7 @@ describe('Dialer', () => { }) it('all failing', (done) => { - const dialer = new Dialer(2, 10) + const dialer = new LimitDialer(2, 10) // mock transport const t1 = { @@ -59,7 +59,7 @@ describe('Dialer', () => { }) it('two success', (done) => { - const dialer = new Dialer(2, 10) + const dialer = new LimitDialer(2, 10) // mock transport const t1 = {