Skip to content
This repository was archived by the owner on Aug 23, 2019. It is now read-only.

Fix/dial class #203

Merged
merged 6 commits into from
Mar 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/dial.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ module.exports = function dial (swarm) {
nextTransport(tKeys.shift())

function nextTransport (key) {
const multiaddrs = pi.multiaddrs.slice()
swarm.transport.dial(key, multiaddrs, (err, conn) => {
swarm.transport.dial(key, pi, (err, conn) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I intentionally had made swarm.transport.dial be just multiaddr aware for simplicity, and so, it would only receive a multiaddr and not a peerInfo

swarm.transport.dial -> deals with dialing on a transport
swarm.dial -> does all the magic of upgrading the connection

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of a breaking change on the API, not a biggie because it is more of an internal that is just exposed for testing, however, I'm not sure if I see the advantage, could you clarify?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am doing the same thing that go does, that is limiting dials per peer. To do that I need to know the peerId otherwise I can't track dials per peer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, we can keep it.

if (err) {
if (tKeys.length === 0) {
return cb(new Error('Could not dial in any of the transports'))
Expand Down
87 changes: 87 additions & 0 deletions src/limit-dialer/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
'use strict'

const map = require('async/map')
const debug = require('debug')

const log = debug('libp2p:swarm:dialer')

const DialQueue = require('./queue')

/**
* Track dials per peer and limited them.
*/
class LimitDialer {
/**
* Create a new dialer.
*
* @param {number} perPeerLimit
* @param {number} dialTimeout
*/
constructor (perPeerLimit, dialTimeout) {
log('create: %s peer limit, %s dial timeout', perPeerLimit, dialTimeout)
this.perPeerLimit = perPeerLimit
this.dialTimeout = dialTimeout
this.queues = new Map()
}

/**
* Dial a list of multiaddrs on the given transport.
*
* @param {PeerId} peer
* @param {SwarmTransport} transport
* @param {Array<Multiaddr>} addrs
* @param {function(Error, Connection)} callback
* @returns {void}
*/
dialMany (peer, transport, addrs, callback) {
log('dialMany:start')
// we use a token to track if we want to cancel following dials
const token = {cancel: false}
map(addrs, (m, cb) => {
this.dialSingle(peer, transport, m, token, cb)
}, (err, results) => {
if (err) {
return callback(err)
}

const success = results.filter((res) => res.conn)
if (success.length > 0) {
log('dialMany:success')
return callback(null, success[0].conn)
}

log('dialMany:error')
const error = new Error('Failed to dial any provided address')
error.errors = results
.filter((res) => res.error)
.map((res) => res.error)
return callback(error)
})
}

/**
* Dial a single multiaddr on the given transport.
*
* @param {PeerId} peer
* @param {SwarmTransport} transport
* @param {Multiaddr} addr
* @param {CancelToken} token
* @param {function(Error, Connection)} callback
* @returns {void}
*/
dialSingle (peer, transport, addr, token, callback) {
const ps = peer.toB58String()
log('dialSingle: %s:%s', ps, addr.toString())
let q
if (this.queues.has(ps)) {
q = this.queues.get(ps)
} else {
q = new DialQueue(this.perPeerLimit, this.dialTimeout)
this.queues.set(ps, q)
}

q.push(transport, addr, token, callback)
}
}

module.exports = LimitDialer
107 changes: 107 additions & 0 deletions src/limit-dialer/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
'use strict'

const Connection = require('interface-connection').Connection
const pull = require('pull-stream')
const timeout = require('async/timeout')
const queue = require('async/queue')
const debug = require('debug')

const log = debug('libp2p:swarm:dialer:queue')

/**
* Queue up the amount of dials to a given peer.
*/
class DialQueue {
/**
* Create a new dial queue.
*
* @param {number} limit
* @param {number} dialTimeout
*/
constructor (limit, dialTimeout) {
this.dialTimeout = dialTimeout

this.queue = queue((task, cb) => {
this._doWork(task.transport, task.addr, task.token, cb)
}, limit)
}

/**
* The actual work done by the queue.
*
* @param {SwarmTransport} transport
* @param {Multiaddr} addr
* @param {CancelToken} token
* @param {function(Error, Connection)} callback
* @returns {void}
* @private
*/
_doWork (transport, addr, token, callback) {
log('work')
this._dialWithTimeout(
transport,
addr,
(err, conn) => {
if (err) {
log('work:error')
return callback(null, {error: err})
}

if (token.cancel) {
log('work:cancel')
// clean up already done dials
pull(pull.empty(), conn)
// TODO: proper cleanup once the connection interface supports it
// return conn.close(() => callback(new Error('Manual cancel'))
return callback(null, {cancel: true})
}

// one is enough
token.cancel = true

log('work:success')

const proxyConn = new Connection()
proxyConn.setInnerConn(conn)
callback(null, {conn})
}
)
}

/**
* Dial the given transport, timing out with the set timeout.
*
* @param {SwarmTransport} transport
* @param {Multiaddr} addr
* @param {function(Error, Connection)} callback
* @returns {void}
*
* @private
*/
_dialWithTimeout (transport, addr, callback) {
timeout((cb) => {
const conn = transport.dial(addr, (err) => {
if (err) {
return cb(err)
}

cb(null, conn)
})
}, this.dialTimeout)(callback)
}

/**
* Add new work to the queue.
*
* @param {SwarmTransport} transport
* @param {Multiaddr} addr
* @param {CancelToken} token
* @param {function(Error, Connection)} callback
* @returns {void}
*/
push (transport, addr, token, callback) {
this.queue.push({transport, addr, token}, callback)
}
}

module.exports = DialQueue
90 changes: 5 additions & 85 deletions src/transport.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
'use strict'

const Connection = require('interface-connection').Connection
const parallel = require('async/parallel')
const queue = require('async/queue')
const timeout = require('async/timeout')
const once = require('once')
const debug = require('debug')
const log = debug('libp2p:swarm:transport')

const protocolMuxer = require('./protocol-muxer')
const LimitDialer = require('./limit-dialer')

// number of concurrent outbound dials to make per peer, same as go-libp2p-swarm
const defaultPerPeerRateLimit = 8
Expand All @@ -17,7 +15,7 @@ const defaultPerPeerRateLimit = 8
const dialTimeout = 10 * 1000

module.exports = function (swarm) {
const queues = new Map()
const dialer = new LimitDialer(defaultPerPeerRateLimit, dialTimeout)

return {
add (key, transport, options, callback) {
Expand All @@ -39,8 +37,9 @@ module.exports = function (swarm) {
callback()
},

dial (key, multiaddrs, callback) {
dial (key, pi, callback) {
const t = swarm.transports[key]
let multiaddrs = pi.multiaddrs.slice()

if (!Array.isArray(multiaddrs)) {
multiaddrs = [multiaddrs]
Expand All @@ -49,77 +48,7 @@ module.exports = function (swarm) {
// filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that)
multiaddrs = dialables(t, multiaddrs)

// create dial queue if non exists
let q
if (queues.has(key)) {
log('reusing queue')
q = queues.get(key)
} else {
log('setting up new queue')
q = queue((multiaddr, cb) => {
dialWithTimeout(t, multiaddr, dialTimeout, (err, conn) => {
if (err) {
log('dial err', err)
return cb(err)
}

if (q.canceled) {
log('dial canceled: %s', multiaddr.toString())
// clean up already done dials
if (conn) {
conn.close()
}
return cb()
}

// one is enough
log('dial success: %s', multiaddr.toString())
q.kill()
q.canceled = true

q.finish(null, conn)
})
}, defaultPerPeerRateLimit)

q.errors = []
q.finishCbs = []

// handle finish
q.finish = (err, conn) => {
log('queue finish')
queues.delete(key)

q.finishCbs.forEach((next) => {
if (err) {
return next(err)
}

const proxyConn = new Connection()
proxyConn.setInnerConn(conn)

next(null, proxyConn)
})
}

// collect errors
q.error = (err) => {
q.errors.push(err)
}

// no more addresses and all failed
q.drain = () => {
log('queue drain')
const err = new Error('Could not dial any address')
err.errors = q.errors
q.errors = []
q.finish(err)
}

queues.set(key, q)
}

q.push(multiaddrs)
q.finishCbs.push(callback)
dialer.dialMany(pi.id, t, multiaddrs, callback)
},

listen (key, options, handler, callback) {
Expand Down Expand Up @@ -192,13 +121,4 @@ function dialables (tp, multiaddrs) {
return tp.filter(multiaddrs)
}

function dialWithTimeout (transport, multiaddr, maxTimeout, callback) {
timeout((cb) => {
const conn = transport.dial(multiaddr, (err) => {
log('dialed')
cb(err, conn)
})
}, maxTimeout)(callback)
}

function noop () {}
Loading