diff --git a/src/dial.js b/src/dial.js index 5a2d9aa..a965cbe 100644 --- a/src/dial.js +++ b/src/dial.js @@ -87,8 +87,7 @@ module.exports = function dial (swarm) { nextTransport(tKeys.shift()) function nextTransport (key) { - const multiaddrs = pi.multiaddrs.slice() - swarm.transport.dial(key, multiaddrs, (err, conn) => { + swarm.transport.dial(key, pi, (err, conn) => { if (err) { if (tKeys.length === 0) { return cb(new Error('Could not dial in any of the transports')) diff --git a/src/limit-dialer/index.js b/src/limit-dialer/index.js new file mode 100644 index 0000000..ef3dd73 --- /dev/null +++ b/src/limit-dialer/index.js @@ -0,0 +1,87 @@ +'use strict' + +const map = require('async/map') +const debug = require('debug') + +const log = debug('libp2p:swarm:dialer') + +const DialQueue = require('./queue') + +/** + * Track dials per peer and limited them. + */ +class LimitDialer { + /** + * Create a new dialer. + * + * @param {number} perPeerLimit + * @param {number} dialTimeout + */ + constructor (perPeerLimit, dialTimeout) { + log('create: %s peer limit, %s dial timeout', perPeerLimit, dialTimeout) + this.perPeerLimit = perPeerLimit + this.dialTimeout = dialTimeout + this.queues = new Map() + } + + /** + * Dial a list of multiaddrs on the given transport. + * + * @param {PeerId} peer + * @param {SwarmTransport} transport + * @param {Array} addrs + * @param {function(Error, Connection)} callback + * @returns {void} + */ + dialMany (peer, transport, addrs, callback) { + log('dialMany:start') + // we use a token to track if we want to cancel following dials + const token = {cancel: false} + map(addrs, (m, cb) => { + this.dialSingle(peer, transport, m, token, cb) + }, (err, results) => { + if (err) { + return callback(err) + } + + const success = results.filter((res) => res.conn) + if (success.length > 0) { + log('dialMany:success') + return callback(null, success[0].conn) + } + + log('dialMany:error') + const error = new Error('Failed to dial any provided address') + error.errors = results + .filter((res) => res.error) + .map((res) => res.error) + return callback(error) + }) + } + + /** + * Dial a single multiaddr on the given transport. + * + * @param {PeerId} peer + * @param {SwarmTransport} transport + * @param {Multiaddr} addr + * @param {CancelToken} token + * @param {function(Error, Connection)} callback + * @returns {void} + */ + dialSingle (peer, transport, addr, token, callback) { + const ps = peer.toB58String() + log('dialSingle: %s:%s', ps, addr.toString()) + let q + if (this.queues.has(ps)) { + q = this.queues.get(ps) + } else { + q = new DialQueue(this.perPeerLimit, this.dialTimeout) + this.queues.set(ps, q) + } + + q.push(transport, addr, token, callback) + } +} + +module.exports = LimitDialer diff --git a/src/limit-dialer/queue.js b/src/limit-dialer/queue.js new file mode 100644 index 0000000..b8d70b6 --- /dev/null +++ b/src/limit-dialer/queue.js @@ -0,0 +1,107 @@ +'use strict' + +const Connection = require('interface-connection').Connection +const pull = require('pull-stream') +const timeout = require('async/timeout') +const queue = require('async/queue') +const debug = require('debug') + +const log = debug('libp2p:swarm:dialer:queue') + +/** + * Queue up the amount of dials to a given peer. + */ +class DialQueue { + /** + * Create a new dial queue. + * + * @param {number} limit + * @param {number} dialTimeout + */ + constructor (limit, dialTimeout) { + this.dialTimeout = dialTimeout + + this.queue = queue((task, cb) => { + this._doWork(task.transport, task.addr, task.token, cb) + }, limit) + } + + /** + * The actual work done by the queue. + * + * @param {SwarmTransport} transport + * @param {Multiaddr} addr + * @param {CancelToken} token + * @param {function(Error, Connection)} callback + * @returns {void} + * @private + */ + _doWork (transport, addr, token, callback) { + log('work') + this._dialWithTimeout( + transport, + addr, + (err, conn) => { + if (err) { + log('work:error') + return callback(null, {error: err}) + } + + if (token.cancel) { + log('work:cancel') + // clean up already done dials + pull(pull.empty(), conn) + // TODO: proper cleanup once the connection interface supports it + // return conn.close(() => callback(new Error('Manual cancel')) + return callback(null, {cancel: true}) + } + + // one is enough + token.cancel = true + + log('work:success') + + const proxyConn = new Connection() + proxyConn.setInnerConn(conn) + callback(null, {conn}) + } + ) + } + + /** + * Dial the given transport, timing out with the set timeout. + * + * @param {SwarmTransport} transport + * @param {Multiaddr} addr + * @param {function(Error, Connection)} callback + * @returns {void} + * + * @private + */ + _dialWithTimeout (transport, addr, callback) { + timeout((cb) => { + const conn = transport.dial(addr, (err) => { + if (err) { + return cb(err) + } + + cb(null, conn) + }) + }, this.dialTimeout)(callback) + } + + /** + * Add new work to the queue. + * + * @param {SwarmTransport} transport + * @param {Multiaddr} addr + * @param {CancelToken} token + * @param {function(Error, Connection)} callback + * @returns {void} + */ + push (transport, addr, token, callback) { + this.queue.push({transport, addr, token}, callback) + } +} + +module.exports = DialQueue diff --git a/src/transport.js b/src/transport.js index b42eb82..d9ba45a 100644 --- a/src/transport.js +++ b/src/transport.js @@ -1,14 +1,12 @@ 'use strict' -const Connection = require('interface-connection').Connection const parallel = require('async/parallel') -const queue = require('async/queue') -const timeout = require('async/timeout') const once = require('once') const debug = require('debug') const log = debug('libp2p:swarm:transport') const protocolMuxer = require('./protocol-muxer') +const LimitDialer = require('./limit-dialer') // number of concurrent outbound dials to make per peer, same as go-libp2p-swarm const defaultPerPeerRateLimit = 8 @@ -17,7 +15,7 @@ const defaultPerPeerRateLimit = 8 const dialTimeout = 10 * 1000 module.exports = function (swarm) { - const queues = new Map() + const dialer = new LimitDialer(defaultPerPeerRateLimit, dialTimeout) return { add (key, transport, options, callback) { @@ -39,8 +37,9 @@ module.exports = function (swarm) { callback() }, - dial (key, multiaddrs, callback) { + dial (key, pi, callback) { const t = swarm.transports[key] + let multiaddrs = pi.multiaddrs.slice() if (!Array.isArray(multiaddrs)) { multiaddrs = [multiaddrs] @@ -49,77 +48,7 @@ module.exports = function (swarm) { // filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that) multiaddrs = dialables(t, multiaddrs) - // create dial queue if non exists - let q - if (queues.has(key)) { - log('reusing queue') - q = queues.get(key) - } else { - log('setting up new queue') - q = queue((multiaddr, cb) => { - dialWithTimeout(t, multiaddr, dialTimeout, (err, conn) => { - if (err) { - log('dial err', err) - return cb(err) - } - - if (q.canceled) { - log('dial canceled: %s', multiaddr.toString()) - // clean up already done dials - if (conn) { - conn.close() - } - return cb() - } - - // one is enough - log('dial success: %s', multiaddr.toString()) - q.kill() - q.canceled = true - - q.finish(null, conn) - }) - }, defaultPerPeerRateLimit) - - q.errors = [] - q.finishCbs = [] - - // handle finish - q.finish = (err, conn) => { - log('queue finish') - queues.delete(key) - - q.finishCbs.forEach((next) => { - if (err) { - return next(err) - } - - const proxyConn = new Connection() - proxyConn.setInnerConn(conn) - - next(null, proxyConn) - }) - } - - // collect errors - q.error = (err) => { - q.errors.push(err) - } - - // no more addresses and all failed - q.drain = () => { - log('queue drain') - const err = new Error('Could not dial any address') - err.errors = q.errors - q.errors = [] - q.finish(err) - } - - queues.set(key, q) - } - - q.push(multiaddrs) - q.finishCbs.push(callback) + dialer.dialMany(pi.id, t, multiaddrs, callback) }, listen (key, options, handler, callback) { @@ -192,13 +121,4 @@ function dialables (tp, multiaddrs) { return tp.filter(multiaddrs) } -function dialWithTimeout (transport, multiaddr, maxTimeout, callback) { - timeout((cb) => { - const conn = transport.dial(multiaddr, (err) => { - log('dialed') - cb(err, conn) - }) - }, maxTimeout)(callback) -} - function noop () {} diff --git a/test/01-transport-tcp.node.js b/test/01-transport-tcp.node.js index f8a5991..e928fa3 100644 --- a/test/01-transport-tcp.node.js +++ b/test/01-transport-tcp.node.js @@ -19,14 +19,16 @@ describe('transport - tcp', function () { let swarmB let peerA let peerB + let dialPeers before((done) => { - utils.createInfos(2, (err, infos) => { + utils.createInfos(5, (err, infos) => { if (err) { return done(err) } peerA = infos[0] peerB = infos[1] + dialPeers = infos.slice(2) peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9888')) peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999')) @@ -86,7 +88,8 @@ describe('transport - tcp', function () { }) it('dial to a multiaddr', (done) => { - const conn = swarmA.transport.dial('tcp', multiaddr('/ip4/127.0.0.1/tcp/9999'), (err, conn) => { + dialPeers[0].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999')) + const conn = swarmA.transport.dial('tcp', dialPeers[0], (err, conn) => { expect(err).to.not.exist() }) @@ -98,14 +101,14 @@ describe('transport - tcp', function () { }) it('dial to set of multiaddr, only one is available', (done) => { - const conn = swarmA.transport.dial('tcp', [ - multiaddr('/ip4/127.0.0.1/tcp/9910/ws'), // not valid on purpose - multiaddr('/ip4/127.0.0.1/tcp/9359'), - multiaddr('/ip4/127.0.0.1/tcp/9329'), - multiaddr('/ip4/127.0.0.1/tcp/9910'), - multiaddr('/ip4/127.0.0.1/tcp/9999'), - multiaddr('/ip4/127.0.0.1/tcp/9309') - ], (err, conn) => { + dialPeers[1].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9910/ws')) // not valid on purpose + dialPeers[1].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9359')) + dialPeers[1].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9329')) + dialPeers[1].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9910')) + dialPeers[1].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999')) + dialPeers[1].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9309')) + + const conn = swarmA.transport.dial('tcp', dialPeers[1], (err, conn) => { expect(err).to.not.exist() }) @@ -117,11 +120,11 @@ describe('transport - tcp', function () { }) it('dial to set of multiaddr, none is available', (done) => { - swarmA.transport.dial('tcp', [ - multiaddr('/ip4/127.0.0.1/tcp/9910/ws'), // not valid on purpose - multiaddr('/ip4/127.0.0.1/tcp/9359'), - multiaddr('/ip4/127.0.0.1/tcp/9329') - ], (err, conn) => { + dialPeers[2].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9910/ws')) // not valid on purpose + dialPeers[2].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9359')) + dialPeers[2].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9329')) + + swarmA.transport.dial('tcp', dialPeers[2], (err, conn) => { expect(err).to.exist() expect(err.errors).to.have.length(2) expect(conn).to.not.exist() diff --git a/test/03-transport-websockets.node.js b/test/03-transport-websockets.node.js index 90dea10..13732bc 100644 --- a/test/03-transport-websockets.node.js +++ b/test/03-transport-websockets.node.js @@ -15,18 +15,20 @@ const utils = require('./utils') const Swarm = require('../src') describe('transport - websockets', function () { - var swarmA - var swarmB - var peerA - var peerB + let swarmA + let swarmB + let peerA + let peerB + let dialPeers before((done) => { - utils.createInfos(2, (err, infos) => { + utils.createInfos(5, (err, infos) => { if (err) { return done(err) } peerA = infos[0] peerB = infos[1] + dialPeers = infos.slice(2) peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9888/ws')) peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999/ws/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC')) @@ -71,7 +73,8 @@ describe('transport - websockets', function () { }) it('dial', (done) => { - const conn = swarmA.transport.dial('ws', multiaddr('/ip4/127.0.0.1/tcp/9999/ws'), (err, conn) => { + dialPeers[0].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999/ws')) + const conn = swarmA.transport.dial('ws', dialPeers[0], (err, conn) => { expect(err).to.not.exist() }) @@ -87,7 +90,8 @@ describe('transport - websockets', function () { }) it('dial (conn from callback)', (done) => { - swarmA.transport.dial('ws', multiaddr('/ip4/127.0.0.1/tcp/9999/ws'), (err, conn) => { + dialPeers[1].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999/ws')) + swarmA.transport.dial('ws', dialPeers[1], (err, conn) => { expect(err).to.not.exist() const s = goodbye({ @@ -103,10 +107,10 @@ describe('transport - websockets', function () { }) it('dial to set of multiaddr, none is available', (done) => { - swarmA.transport.dial('ws', [ - multiaddr('/ip4/127.0.0.1/tcp/9320/ws'), - multiaddr('/ip4/127.0.0.1/tcp/9359/ws') - ], (err, conn) => { + dialPeers[2].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9320/ws')) + dialPeers[2].multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9359/ws')) + + swarmA.transport.dial('ws', dialPeers[2], (err, conn) => { expect(err).to.exist() expect(err.errors).to.have.length(2) expect(conn).to.not.exist() diff --git a/test/browser-00-transport-websockets.js b/test/browser-00-transport-websockets.js index 339bb55..592dff2 100644 --- a/test/browser-00-transport-websockets.js +++ b/test/browser-00-transport-websockets.js @@ -16,13 +16,22 @@ const Swarm = require('../src') describe('transport - websockets', () => { let swarm + let peer - before(() => { + before((done) => { const b58IdSrc = 'QmYzgdesgjdvD3okTPGZT9NPmh1BuH5FfTVNKjsvaAprhb' // use a pre generated Id to save time const idSrc = Id.createFromB58String(b58IdSrc) const peerSrc = Peer(idSrc) swarm = new Swarm(peerSrc) + + Peer.create((err, p) => { + if (err) { + return done(err) + } + peer = p + done() + }) }) it('add', (done) => { @@ -33,9 +42,10 @@ describe('transport - websockets', () => { }) it('dial', (done) => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9100/ws') + peer.multiaddrs = [] + peer.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9100/ws')) - const conn = swarm.transport.dial('ws', ma, (err, conn) => { + const conn = swarm.transport.dial('ws', peer, (err, conn) => { expect(err).to.not.exist() }) diff --git a/test/browser-01-transport-webrtc-star.js b/test/browser-01-transport-webrtc-star.js index 7bf93d5..5327373 100644 --- a/test/browser-01-transport-webrtc-star.js +++ b/test/browser-01-transport-webrtc-star.js @@ -63,7 +63,7 @@ describe('transport - webrtc-star', () => { }) it('dial', (done) => { - swarm1.transport.dial('wstar', peer2.multiaddrs[0], (err, conn) => { + swarm1.transport.dial('wstar', peer2, (err, conn) => { expect(err).to.not.exist() const text = 'Hello World' @@ -78,9 +78,10 @@ describe('transport - webrtc-star', () => { ) }) }) - it('dial offline / non-exist()ent node', (done) => { - const mhOffline = multiaddr('/libp2p-webrtc-star/ip4/127.0.0.1/tcp/15555/ws/ipfs/ABCD') - swarm1.transport.dial('wstar', mhOffline, (err, conn) => { + it('dial offline / non-existent node', (done) => { + peer2.multiaddrs = [] + peer2.multiaddr.add(multiaddr('/libp2p-webrtc-star/ip4/127.0.0.1/tcp/15555/ws/ipfs/ABCD')) + swarm1.transport.dial('wstar', peer2, (err, conn) => { expect(err).to.exist() expect(conn).to.not.exist() done() diff --git a/test/limit-dialer.spec.js b/test/limit-dialer.spec.js new file mode 100644 index 0000000..9613c92 --- /dev/null +++ b/test/limit-dialer.spec.js @@ -0,0 +1,99 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const multiaddr = require('multiaddr') +const pull = require('pull-stream') +const setImmediate = require('async/setImmediate') + +const LimitDialer = require('../src/limit-dialer') +const utils = require('./utils') + +describe('LimitDialer', () => { + let peers + + before((done) => { + utils.createInfos(5, (err, infos) => { + if (err) { + return done(err) + } + peers = infos + + peers.forEach((peer, i) => { + peer.multiaddr.add( + multiaddr(`/ip4/191.0.0.1/tcp/123${i}`) + ) + peer.multiaddr.add( + multiaddr(`/ip4/192.168.0.1/tcp/923${i}`) + ) + peer.multiaddr.add( + multiaddr(`/ip4/193.168.0.99/tcp/923${i}`) + ) + }) + done() + }) + }) + + it('all failing', (done) => { + const dialer = new LimitDialer(2, 10) + + // mock transport + const t1 = { + dial (addr, cb) { + setTimeout(() => { + cb(new Error('fail')) + }, 1) + return {} + } + } + + dialer.dialMany(peers[0].id, t1, peers[0].multiaddrs, (err, conn) => { + expect(err).to.exist() + expect(err.errors).to.have.length(3) + expect(err.errors[0].message).to.eql('fail') + expect(conn).to.not.exist() + done() + }) + }) + + it('two success', (done) => { + const dialer = new LimitDialer(2, 10) + + // mock transport + const t1 = { + dial (addr, cb) { + const as = addr.toString() + if (as.match(/191/)) { + setImmediate(() => cb(new Error('fail'))) + return {} + } else if (as.match(/192/)) { + setTimeout(cb, 2) + return { + source: pull.values([1]), + sink: pull.drain() + } + } else if (as.match(/193/)) { + setTimeout(cb, 8) + return { + source: pull.values([2]), + sink: pull.drain() + } + } + } + } + + dialer.dialMany(peers[0].id, t1, peers[0].multiaddrs, (err, conn) => { + expect(err).to.not.exist() + pull( + conn, + pull.collect((err, res) => { + expect(err).to.not.exist() + expect(res).to.be.eql([1]) + done() + }) + ) + }) + }) +})