Skip to content
This repository was archived by the owner on Aug 23, 2019. It is now read-only.

Commit cf1a169

Browse files
dignifiedquiredryajov
authored andcommitted
Fix/dial class (#203)
* fix dialing
1 parent f47fb9b commit cf1a169

9 files changed

+350
-120
lines changed

src/dial.js

+1-2
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,7 @@ module.exports = function dial (swarm) {
8787
nextTransport(tKeys.shift())
8888

8989
function nextTransport (key) {
90-
const multiaddrs = pi.multiaddrs.slice()
91-
swarm.transport.dial(key, multiaddrs, (err, conn) => {
90+
swarm.transport.dial(key, pi, (err, conn) => {
9291
if (err) {
9392
if (tKeys.length === 0) {
9493
return cb(new Error('Could not dial in any of the transports'))

src/limit-dialer/index.js

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
'use strict'
2+
3+
const map = require('async/map')
4+
const debug = require('debug')
5+
6+
const log = debug('libp2p:swarm:dialer')
7+
8+
const DialQueue = require('./queue')
9+
10+
/**
11+
* Track dials per peer and limited them.
12+
*/
13+
class LimitDialer {
14+
/**
15+
* Create a new dialer.
16+
*
17+
* @param {number} perPeerLimit
18+
* @param {number} dialTimeout
19+
*/
20+
constructor (perPeerLimit, dialTimeout) {
21+
log('create: %s peer limit, %s dial timeout', perPeerLimit, dialTimeout)
22+
this.perPeerLimit = perPeerLimit
23+
this.dialTimeout = dialTimeout
24+
this.queues = new Map()
25+
}
26+
27+
/**
28+
* Dial a list of multiaddrs on the given transport.
29+
*
30+
* @param {PeerId} peer
31+
* @param {SwarmTransport} transport
32+
* @param {Array<Multiaddr>} addrs
33+
* @param {function(Error, Connection)} callback
34+
* @returns {void}
35+
*/
36+
dialMany (peer, transport, addrs, callback) {
37+
log('dialMany:start')
38+
// we use a token to track if we want to cancel following dials
39+
const token = {cancel: false}
40+
map(addrs, (m, cb) => {
41+
this.dialSingle(peer, transport, m, token, cb)
42+
}, (err, results) => {
43+
if (err) {
44+
return callback(err)
45+
}
46+
47+
const success = results.filter((res) => res.conn)
48+
if (success.length > 0) {
49+
log('dialMany:success')
50+
return callback(null, success[0].conn)
51+
}
52+
53+
log('dialMany:error')
54+
const error = new Error('Failed to dial any provided address')
55+
error.errors = results
56+
.filter((res) => res.error)
57+
.map((res) => res.error)
58+
return callback(error)
59+
})
60+
}
61+
62+
/**
63+
* Dial a single multiaddr on the given transport.
64+
*
65+
* @param {PeerId} peer
66+
* @param {SwarmTransport} transport
67+
* @param {Multiaddr} addr
68+
* @param {CancelToken} token
69+
* @param {function(Error, Connection)} callback
70+
* @returns {void}
71+
*/
72+
dialSingle (peer, transport, addr, token, callback) {
73+
const ps = peer.toB58String()
74+
log('dialSingle: %s:%s', ps, addr.toString())
75+
let q
76+
if (this.queues.has(ps)) {
77+
q = this.queues.get(ps)
78+
} else {
79+
q = new DialQueue(this.perPeerLimit, this.dialTimeout)
80+
this.queues.set(ps, q)
81+
}
82+
83+
q.push(transport, addr, token, callback)
84+
}
85+
}
86+
87+
module.exports = LimitDialer

src/limit-dialer/queue.js

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
'use strict'
2+
3+
const Connection = require('interface-connection').Connection
4+
const pull = require('pull-stream')
5+
const timeout = require('async/timeout')
6+
const queue = require('async/queue')
7+
const debug = require('debug')
8+
9+
const log = debug('libp2p:swarm:dialer:queue')
10+
11+
/**
12+
* Queue up the amount of dials to a given peer.
13+
*/
14+
class DialQueue {
15+
/**
16+
* Create a new dial queue.
17+
*
18+
* @param {number} limit
19+
* @param {number} dialTimeout
20+
*/
21+
constructor (limit, dialTimeout) {
22+
this.dialTimeout = dialTimeout
23+
24+
this.queue = queue((task, cb) => {
25+
this._doWork(task.transport, task.addr, task.token, cb)
26+
}, limit)
27+
}
28+
29+
/**
30+
* The actual work done by the queue.
31+
*
32+
* @param {SwarmTransport} transport
33+
* @param {Multiaddr} addr
34+
* @param {CancelToken} token
35+
* @param {function(Error, Connection)} callback
36+
* @returns {void}
37+
* @private
38+
*/
39+
_doWork (transport, addr, token, callback) {
40+
log('work')
41+
this._dialWithTimeout(
42+
transport,
43+
addr,
44+
(err, conn) => {
45+
if (err) {
46+
log('work:error')
47+
return callback(null, {error: err})
48+
}
49+
50+
if (token.cancel) {
51+
log('work:cancel')
52+
// clean up already done dials
53+
pull(pull.empty(), conn)
54+
// TODO: proper cleanup once the connection interface supports it
55+
// return conn.close(() => callback(new Error('Manual cancel'))
56+
return callback(null, {cancel: true})
57+
}
58+
59+
// one is enough
60+
token.cancel = true
61+
62+
log('work:success')
63+
64+
const proxyConn = new Connection()
65+
proxyConn.setInnerConn(conn)
66+
callback(null, {conn})
67+
}
68+
)
69+
}
70+
71+
/**
72+
* Dial the given transport, timing out with the set timeout.
73+
*
74+
* @param {SwarmTransport} transport
75+
* @param {Multiaddr} addr
76+
* @param {function(Error, Connection)} callback
77+
* @returns {void}
78+
*
79+
* @private
80+
*/
81+
_dialWithTimeout (transport, addr, callback) {
82+
timeout((cb) => {
83+
const conn = transport.dial(addr, (err) => {
84+
if (err) {
85+
return cb(err)
86+
}
87+
88+
cb(null, conn)
89+
})
90+
}, this.dialTimeout)(callback)
91+
}
92+
93+
/**
94+
* Add new work to the queue.
95+
*
96+
* @param {SwarmTransport} transport
97+
* @param {Multiaddr} addr
98+
* @param {CancelToken} token
99+
* @param {function(Error, Connection)} callback
100+
* @returns {void}
101+
*/
102+
push (transport, addr, token, callback) {
103+
this.queue.push({transport, addr, token}, callback)
104+
}
105+
}
106+
107+
module.exports = DialQueue

src/transport.js

+5-85
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
'use strict'
22

3-
const Connection = require('interface-connection').Connection
43
const parallel = require('async/parallel')
5-
const queue = require('async/queue')
6-
const timeout = require('async/timeout')
74
const once = require('once')
85
const debug = require('debug')
96
const log = debug('libp2p:swarm:transport')
107

118
const protocolMuxer = require('./protocol-muxer')
9+
const LimitDialer = require('./limit-dialer')
1210

1311
// number of concurrent outbound dials to make per peer, same as go-libp2p-swarm
1412
const defaultPerPeerRateLimit = 8
@@ -17,7 +15,7 @@ const defaultPerPeerRateLimit = 8
1715
const dialTimeout = 10 * 1000
1816

1917
module.exports = function (swarm) {
20-
const queues = new Map()
18+
const dialer = new LimitDialer(defaultPerPeerRateLimit, dialTimeout)
2119

2220
return {
2321
add (key, transport, options, callback) {
@@ -39,8 +37,9 @@ module.exports = function (swarm) {
3937
callback()
4038
},
4139

42-
dial (key, multiaddrs, callback) {
40+
dial (key, pi, callback) {
4341
const t = swarm.transports[key]
42+
let multiaddrs = pi.multiaddrs.slice()
4443

4544
if (!Array.isArray(multiaddrs)) {
4645
multiaddrs = [multiaddrs]
@@ -49,77 +48,7 @@ module.exports = function (swarm) {
4948
// filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that)
5049
multiaddrs = dialables(t, multiaddrs)
5150

52-
// create dial queue if non exists
53-
let q
54-
if (queues.has(key)) {
55-
log('reusing queue')
56-
q = queues.get(key)
57-
} else {
58-
log('setting up new queue')
59-
q = queue((multiaddr, cb) => {
60-
dialWithTimeout(t, multiaddr, dialTimeout, (err, conn) => {
61-
if (err) {
62-
log('dial err', err)
63-
return cb(err)
64-
}
65-
66-
if (q.canceled) {
67-
log('dial canceled: %s', multiaddr.toString())
68-
// clean up already done dials
69-
if (conn) {
70-
conn.close()
71-
}
72-
return cb()
73-
}
74-
75-
// one is enough
76-
log('dial success: %s', multiaddr.toString())
77-
q.kill()
78-
q.canceled = true
79-
80-
q.finish(null, conn)
81-
})
82-
}, defaultPerPeerRateLimit)
83-
84-
q.errors = []
85-
q.finishCbs = []
86-
87-
// handle finish
88-
q.finish = (err, conn) => {
89-
log('queue finish')
90-
queues.delete(key)
91-
92-
q.finishCbs.forEach((next) => {
93-
if (err) {
94-
return next(err)
95-
}
96-
97-
const proxyConn = new Connection()
98-
proxyConn.setInnerConn(conn)
99-
100-
next(null, proxyConn)
101-
})
102-
}
103-
104-
// collect errors
105-
q.error = (err) => {
106-
q.errors.push(err)
107-
}
108-
109-
// no more addresses and all failed
110-
q.drain = () => {
111-
log('queue drain')
112-
const err = new Error('Could not dial any address')
113-
err.errors = q.errors
114-
q.errors = []
115-
q.finish(err)
116-
}
117-
118-
queues.set(key, q)
119-
}
120-
121-
q.push(multiaddrs)
122-
q.finishCbs.push(callback)
51+
dialer.dialMany(pi.id, t, multiaddrs, callback)
12352
},
12453

12554
listen (key, options, handler, callback) {
@@ -192,13 +121,4 @@ function dialables (tp, multiaddrs) {
192121
return tp.filter(multiaddrs)
193122
}
194123

195-
function dialWithTimeout (transport, multiaddr, maxTimeout, callback) {
196-
timeout((cb) => {
197-
const conn = transport.dial(multiaddr, (err) => {
198-
log('dialed')
199-
cb(err, conn)
200-
})
201-
}, maxTimeout)(callback)
202-
}
203-
204124
function noop () {}

0 commit comments

Comments
 (0)