Skip to content
This repository was archived by the owner on Aug 23, 2019. It is now read-only.

Commit 1eb28ca

Browse files
jacobheunJacob Heun
authored and
Jacob Heun
committed
fix: add utility methods to prevent already piped error
1 parent cb5245a commit 1eb28ca

File tree

1 file changed

+102
-63
lines changed

1 file changed

+102
-63
lines changed

src/dial.js

Lines changed: 102 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,53 @@ const log = debug('libp2p:switch:dial')
1111

1212
const getPeerInfo = require('./get-peer-info')
1313
const observeConnection = require('./observe-connection')
14+
const UNEXPECTED_END = 'Unexpected end of input from reader.'
15+
16+
/**
17+
* Uses the given MultistreamDialer to select the protocol matching the given key
18+
*
19+
* A helper method to catch errors from pull streams ending unexpectedly
20+
* Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged.
21+
*
22+
* @param {MultistreamDialer} msDialer a multistream.Dialer
23+
* @param {string} key The key type to select
24+
* @param {function(Error)} callback Used for standard async flow
25+
* @param {function(Error)} abort A callback to be used for ending the connection outright
26+
* @returns {void}
27+
*/
28+
function selectSafe (msDialer, key, callback, abort) {
29+
msDialer.select(key, (err, conn) => {
30+
if (err === true) {
31+
return abort(new Error(UNEXPECTED_END))
32+
}
33+
34+
callback(err, conn)
35+
})
36+
}
37+
38+
/**
39+
* Uses the given MultistreamDialer to handle the given connection
40+
*
41+
* A helper method to catch errors from pull streams ending unexpectedly
42+
* Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged
43+
*
44+
* @param {MultistreamDialer} msDialer
45+
* @param {Connection} connection The connection to handle
46+
* @param {function(Error)} callback Used for standard async flow
47+
* @param {function(Error)} abort A callback to be used for ending the connection outright
48+
* @returns {void}
49+
*/
50+
function handleSafe (msDialer, connection, callback, abort) {
51+
msDialer.handle(connection, (err) => {
52+
// Repackage errors from pull-streams ending unexpectedly.
53+
// Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged.
54+
if (err === true) {
55+
return abort(new Error(UNEXPECTED_END))
56+
}
57+
58+
callback(err)
59+
})
60+
}
1461

1562
/**
1663
* Manages dialing to another peer, including muxer upgrades
@@ -55,6 +102,11 @@ class Dialer {
55102
cb(null)
56103
}
57104
], (err, connection) => {
105+
if ((err && err.message === UNEXPECTED_END) || err === true) {
106+
log('Connection dropped for %s', this.peerInfo.id.toB58String())
107+
return this.callback(null, null)
108+
}
109+
58110
this.callback(err, connection)
59111
})
60112

@@ -160,13 +212,6 @@ class Dialer {
160212

161213
connection.setPeerInfo(this.peerInfo)
162214
this._attemptMuxerUpgrade(connection, b58Id, (err, muxer) => {
163-
// The underlying stream closed unexpectedly, so drop the connection.
164-
// Fixes https://github.com/libp2p/js-libp2p-switch/issues/235
165-
if (err && err.message === 'Unexpected end of input from reader.') {
166-
log('Connection dropped for %s', b58Id)
167-
return callback(null, null)
168-
}
169-
170215
if (err && !this.protocol) {
171216
this.switch.conns[b58Id] = connection
172217
return callback(null, null)
@@ -178,7 +223,7 @@ class Dialer {
178223
}
179224

180225
callback(null, muxer)
181-
})
226+
}, callback)
182227
}
183228

184229
/**
@@ -190,65 +235,62 @@ class Dialer {
190235
* @param {Connection} connection
191236
* @param {string} b58Id
192237
* @param {function(Error, Connection)} callback
238+
* @param {function(Error, Connection)} abort A callback to be used for ending the connection outright
193239
* @returns {void}
194240
*/
195-
_attemptMuxerUpgrade (connection, b58Id, callback) {
241+
_attemptMuxerUpgrade (connection, b58Id, callback, abort) {
196242
const muxers = Object.keys(this.switch.muxers)
243+
197244
if (muxers.length === 0) {
198245
return callback(new Error('no muxers available'))
199246
}
200247

201-
// 1. try to handshake in one of the muxers available
202-
// 2. if succeeds
203-
// - add the muxedConn to the list of muxedConns
204-
// - add incomming new streams to connHandler
205-
const nextMuxer = (key) => {
206-
log('selecting %s', key)
207-
msDialer.select(key, (err, conn) => {
208-
if (err) {
209-
if (muxers.length === 0) {
210-
return callback(new Error('could not upgrade to stream muxing'))
211-
}
248+
const msDialer = new multistream.Dialer()
249+
handleSafe(msDialer, connection, (err) => {
250+
if (err) {
251+
return callback(new Error('multistream not supported'))
252+
}
212253

213-
return nextMuxer(muxers.shift())
214-
}
254+
// 1. try to handshake in one of the muxers available
255+
// 2. if succeeds
256+
// - add the muxedConn to the list of muxedConns
257+
// - add incomming new streams to connHandler
258+
const nextMuxer = (key) => {
259+
log('selecting %s', key)
260+
selectSafe(msDialer, key, (err, conn) => {
261+
if (err) {
262+
if (muxers.length === 0) {
263+
return callback(new Error('could not upgrade to stream muxing'))
264+
}
215265

216-
const muxedConn = this.switch.muxers[key].dialer(conn)
217-
this.switch.muxedConns[b58Id] = {}
218-
this.switch.muxedConns[b58Id].muxer = muxedConn
266+
return nextMuxer(muxers.shift())
267+
}
219268

220-
muxedConn.once('close', () => {
221-
delete this.switch.muxedConns[b58Id]
222-
this.peerInfo.disconnect()
223-
this.switch._peerInfo.disconnect()
224-
setImmediate(() => this.switch.emit('peer-mux-closed', this.peerInfo))
225-
})
269+
const muxedConn = this.switch.muxers[key].dialer(conn)
270+
this.switch.muxedConns[b58Id] = {}
271+
this.switch.muxedConns[b58Id].muxer = muxedConn
226272

227-
// For incoming streams, in case identify is on
228-
muxedConn.on('stream', (conn) => {
229-
conn.setPeerInfo(this.peerInfo)
230-
this.switch.protocolMuxer(null)(conn)
231-
})
273+
muxedConn.once('close', () => {
274+
delete this.switch.muxedConns[b58Id]
275+
this.peerInfo.disconnect()
276+
this.switch._peerInfo.disconnect()
277+
setImmediate(() => this.switch.emit('peer-mux-closed', this.peerInfo))
278+
})
232279

233-
setImmediate(() => this.switch.emit('peer-mux-established', this.peerInfo))
280+
// For incoming streams, in case identify is on
281+
muxedConn.on('stream', (conn) => {
282+
conn.setPeerInfo(this.peerInfo)
283+
this.switch.protocolMuxer(null)(conn)
284+
})
234285

235-
callback(null, muxedConn)
236-
})
237-
}
286+
setImmediate(() => this.switch.emit('peer-mux-established', this.peerInfo))
238287

239-
const msDialer = new multistream.Dialer()
240-
msDialer.handle(connection, (err) => {
241-
if (err) {
242-
// Repackage errors from pull-streams ending unexpectedly.
243-
// Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged.
244-
if (err === true) {
245-
return callback(new Error('Unexpected end of input from reader.'))
246-
}
247-
return callback(new Error('multistream not supported'))
288+
callback(null, muxedConn)
289+
}, abort)
248290
}
249291

250292
nextMuxer(muxers.shift())
251-
})
293+
}, abort)
252294
}
253295

254296
/**
@@ -314,16 +356,15 @@ class Dialer {
314356
*/
315357
_encryptConnection (connection, callback) {
316358
const msDialer = new multistream.Dialer()
317-
318-
msDialer.handle(connection, (err) => {
359+
handleSafe(msDialer, connection, (err) => {
319360
if (err) {
320361
return callback(err)
321362
}
322363

323364
const myId = this.switch._peerInfo.id
324365
log('selecting crypto: %s', this.switch.crypto.tag)
325366

326-
msDialer.select(this.switch.crypto.tag, (err, _conn) => {
367+
selectSafe(msDialer, this.switch.crypto.tag, (err, _conn) => {
327368
if (err) {
328369
return callback(err)
329370
}
@@ -338,8 +379,8 @@ class Dialer {
338379
encryptedConnection.setPeerInfo(this.peerInfo)
339380
callback(null, encryptedConnection)
340381
})
341-
})
342-
})
382+
}, callback)
383+
}, callback)
343384
}
344385

345386
/**
@@ -357,17 +398,15 @@ class Dialer {
357398
}
358399

359400
const msDialer = new multistream.Dialer()
360-
msDialer.handle(connection, (err) => {
401+
handleSafe(msDialer, connection, (err) => {
361402
if (err) {
362403
return callback(err)
363404
}
364-
msDialer.select(this.protocol, (err, conn) => {
365-
if (err) {
366-
return callback(err)
367-
}
368-
callback(null, conn)
369-
})
370-
})
405+
406+
selectSafe(msDialer, this.protocol, (err, conn) => {
407+
callback(err, conn)
408+
}, callback)
409+
}, callback)
371410
}
372411
}
373412

0 commit comments

Comments
 (0)