Skip to content

Commit 4c88bcd

Browse files
committed
refactor: switch to async iterators
BREAKING CHANGE: Switch to using async/await and async iterators. The transport and connection interfaces have changed.
1 parent 4e9f068 commit 4c88bcd

9 files changed

+299
-149
lines changed

.travis.yml

-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ jobs:
2121
include:
2222
- stage: check
2323
script:
24-
- npx aegir commitlint --travis
2524
- npx aegir dep-check
2625
- npm run lint
2726

README.md

+10
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,16 @@ js-libp2p-utp
2020

2121
[Vasco Santos](https://github.com/vasco-santos).
2222

23+
## API
24+
25+
### Transport
26+
27+
[![](https://raw.githubusercontent.com/libp2p/interface-transport/master/img/badge.png)](https://github.com/libp2p/interface-transport)
28+
29+
### Connection
30+
31+
[![](https://raw.githubusercontent.com/libp2p/interface-connection/master/img/badge.png)](https://github.com/libp2p/interface-connection)
32+
2333
# Acknowledgements
2434

2535
`js-libp2p-utp` is a wrapper on top on [utp](https://github.com/mafintosh/utp) originally developed by [Mathias Buus](https://github.com/mafintosh)

package.json

+10-5
Original file line numberDiff line numberDiff line change
@@ -27,23 +27,28 @@
2727
},
2828
"homepage": "https://github.com/libp2p/js-libp2p-utp",
2929
"devDependencies": {
30-
"aegir": "^18.2.2",
30+
"aegir": "^20.0.0",
3131
"chai": "^4.2.0",
3232
"dirty-chai": "^2.0.1"
3333
},
3434
"dependencies": {
35+
"abortable-iterator": "^2.1.0",
3536
"class-is": "^1.1.0",
3637
"debug": "^4.1.1",
38+
"err-code": "^2.0.0",
3739
"interface-connection": "~0.3.3",
38-
"ip-address": "^5.9.0",
40+
"interface-transport": "^0.5.2",
41+
"ip-address": "^6.1.0",
42+
"it-pipe": "^1.0.1",
3943
"lodash.includes": "^4.3.0",
4044
"lodash.isfunction": "^3.0.9",
41-
"mafmt": "^6.0.7",
42-
"multiaddr": "^6.0.6",
45+
"mafmt": "^6.0.8",
46+
"multiaddr": "^6.1.0",
4347
"once": "^1.4.0",
4448
"pull-stream": "^3.6.11",
4549
"stream-to-pull-stream": "^1.7.3",
46-
"utp-native": "^2.1.3"
50+
"streaming-iterables": "^4.1.0",
51+
"utp-native": "^2.1.4"
4752
},
4853
"contributors": [
4954
"David Dias <[email protected]>",

src/adapter.js

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
'use strict'
2+
3+
const { Adapter } = require('interface-transport')
4+
const withIs = require('class-is')
5+
const UTP = require('.')
6+
7+
// Legacy adapter to old transport & connection interface
8+
class UTPAdapter extends Adapter {
9+
constructor () {
10+
super(new UTP())
11+
}
12+
}
13+
14+
module.exports = withIs(UTPAdapter, {
15+
className: 'UTP',
16+
symbolName: '@libp2p/js-libp2p-utp/utp'
17+
})

src/constants.js

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
'use strict'
2+
3+
// IPFS multi-address code
4+
module.exports.IPFS_MA_CODE = 421
5+
6+
// Time to wait for a connection to close gracefully before destroying it
7+
// manually
8+
module.exports.CLOSE_TIMEOUT = 2000

src/create-listener.js

+50-40
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,37 @@
11
'use strict'
22

3+
const debug = require('debug')
4+
const log = debug('libp2p:utp:listen')
5+
const logError = debug('libp2p:utp:listen:error')
6+
37
const multiaddr = require('multiaddr')
4-
const Connection = require('interface-connection').Connection
5-
// const os = require('os')
68
const includes = require('lodash.includes')
79
const utp = require('utp-native')
8-
const toPull = require('stream-to-pull-stream')
9-
const EventEmitter = require('events').EventEmitter
10-
const debug = require('debug')
11-
const log = debug('libp2p:utp')
10+
const { EventEmitter } = require('events')
1211

12+
const Libp2pSocket = require('./socket')
1313
const getMultiaddr = require('./get-multiaddr')
1414

15-
const IPFS_CODE = 421
16-
const CLOSE_TIMEOUT = 2000
17-
18-
function noop () {}
15+
const { IPFS_MA_CODE, CLOSE_TIMEOUT } = require('./constants')
1916

2017
module.exports = (handler) => {
2118
const listener = new EventEmitter()
2219

2320
const server = utp.createServer((socket) => {
2421
// Avoid uncaught errors cause by unstable connections
25-
socket.on('error', noop)
22+
socket.on('error', (err) => {
23+
logError('Error emitted by server handler socket: ' + err.message)
24+
})
2625

2726
const addr = getMultiaddr(socket)
28-
29-
const s = toPull.duplex(socket)
27+
const s = new Libp2pSocket(socket, addr)
3028

3129
s.getObservedAddrs = (cb) => cb(null, [addr])
3230

3331
trackSocket(server, socket)
3432

35-
const conn = new Connection(s)
36-
handler(conn)
37-
listener.emit('connection', conn)
33+
handler && handler(s)
34+
listener.emit('connection', s)
3835
})
3936

4037
server.on('listening', () => listener.emit('listening'))
@@ -44,57 +41,70 @@ module.exports = (handler) => {
4441
// Keep track of open connections to destroy in case of timeout
4542
server.__connections = {}
4643

47-
listener.close = (options, callback) => {
48-
if (typeof options === 'function') {
49-
callback = options
50-
options = {}
44+
listener.close = (options = {}) => {
45+
if (!server.listening) {
46+
return
5147
}
52-
callback = callback || noop
53-
options = options || {}
54-
55-
const timeout = setTimeout(() => {
56-
log('unable to close graciously, destroying conns')
57-
Object.keys(server.__connections).forEach((key) => {
58-
log('destroying %s', key)
59-
server.__connections[key].destroy()
60-
})
61-
}, options.timeout || CLOSE_TIMEOUT)
6248

63-
server.close(callback)
49+
return new Promise((resolve, reject) => {
50+
const start = Date.now()
6451

65-
server.once('close', () => {
66-
clearTimeout(timeout)
52+
// Attempt to stop the server. If it takes longer than the timeout,
53+
// destroy all the underlying sockets manually.
54+
const timeout = setTimeout(() => {
55+
log('Timeout closing server after %dms, destroying connections manually', Date.now() - start)
56+
Object.keys(server.__connections).forEach((key) => {
57+
log('destroying %s', key)
58+
server.__connections[key].destroy()
59+
})
60+
resolve()
61+
}, options.timeout || CLOSE_TIMEOUT)
62+
63+
server.once('close', () => clearTimeout(timeout))
64+
65+
server.close((err) => err ? reject(err) : resolve())
6766
})
6867
}
6968

7069
let ipfsId
7170
let listeningAddr
7271

73-
listener.listen = (ma, callback) => {
72+
listener.listen = (ma) => {
7473
listeningAddr = ma
7574
if (includes(ma.protoNames(), 'ipfs')) {
7675
ipfsId = getIpfsId(ma)
7776
listeningAddr = ma.decapsulate('ipfs')
7877
}
7978

8079
const lOpts = listeningAddr.toOptions()
81-
log('Listening on %s %s', lOpts.port, lOpts.host)
82-
server.listen(Number(lOpts.port), lOpts.host, callback)
80+
81+
return new Promise((resolve, reject) => {
82+
server.listen(Number(lOpts.port), lOpts.host, (err) => {
83+
if (err) {
84+
return reject(err)
85+
}
86+
87+
log('Listening on %s %s', lOpts.port, lOpts.host)
88+
resolve()
89+
})
90+
})
8391
}
8492

85-
listener.getAddrs = (callback) => {
93+
listener.getAddrs = () => {
8694
const multiaddrs = []
8795
const addr = server.address()
8896

8997
if (!addr) {
90-
return callback(new Error('Listener is not ready yet'))
98+
throw new Error('Listener is not ready yet')
9199
}
92100

93101
let ma
94102
if (addr.family === 'IPv6') {
95103
ma = multiaddr(`/ip6/${addr.address}/udp/${addr.port}/utp`)
96104
} else if (addr.family === 'IPv4') {
105+
/* eslint-disable no-console */
97106
console.log(`/ip4/${addr.address}/udp/${addr.port}/utp`)
107+
/* eslint-enable no-console */
98108
ma = multiaddr(`/ip4/${addr.address}/udp/${addr.port}/utp`)
99109
}
100110

@@ -104,15 +114,15 @@ module.exports = (handler) => {
104114

105115
multiaddrs.push(ma)
106116

107-
callback(null, multiaddrs)
117+
return multiaddrs
108118
}
109119

110120
return listener
111121
}
112122

113123
function getIpfsId (ma) {
114124
return ma.stringTuples().filter((tuple) => {
115-
return tuple[0] === IPFS_CODE
125+
return tuple[0] === IPFS_MA_CODE
116126
})[0][1]
117127
}
118128

src/index.js

+50-29
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,75 @@
11
'use strict'
22

3+
const debug = require('debug')
4+
const log = debug('libp2p:utp')
5+
const errcode = require('err-code')
6+
37
const utp = require('utp-native')
4-
const toPull = require('stream-to-pull-stream')
58
const mafmt = require('mafmt')
69
const withIs = require('class-is')
710
const includes = require('lodash.includes')
811
const isFunction = require('lodash.isfunction')
9-
const Connection = require('interface-connection').Connection
10-
const once = require('once')
12+
const { AbortError } = require('interface-transport')
13+
const Libp2pSocket = require('./socket')
1114
const createListener = require('./create-listener.js')
12-
const debug = require('debug')
13-
const log = debug('libp2p:utp')
1415

1516
function noop () {}
1617

1718
class UTP {
18-
dial (ma, options, callback) {
19-
if (isFunction(options)) {
20-
callback = options
21-
options = {}
22-
}
23-
24-
callback = once(callback || noop)
19+
async dial (ma, options = {}) {
20+
const rawSocket = await this._connect(ma, options)
21+
return new Libp2pSocket(rawSocket, ma, options)
22+
}
2523

24+
_connect (ma, options) {
2625
const cOpts = ma.toOptions()
27-
log('Connecting (UTP) to %s %s', cOpts.port, cOpts.host)
26+
log('Dialing %s:%s', cOpts.host, cOpts.port)
2827

29-
const rawSocket = utp.connect(cOpts)
28+
return new Promise((resolve, reject) => {
29+
if ((options.signal || {}).aborted) {
30+
return reject(new AbortError())
31+
}
3032

31-
rawSocket.once('timeout', () => {
32-
log('timeout')
33-
rawSocket.emit('error', new Error('Timeout'))
34-
})
33+
const start = Date.now()
34+
const rawSocket = utp.connect(cOpts)
3535

36-
rawSocket.once('error', callback)
36+
const onError = (err) => {
37+
const msg = `Error dialing ${cOpts.host}:${cOpts.port}: ${err.message}`
38+
done(errcode(new Error(msg), err.code))
39+
}
3740

38-
rawSocket.once('connect', () => {
39-
rawSocket.removeListener('error', callback)
40-
callback()
41-
})
41+
const onTimeout = () => {
42+
log('Timeout dialing %s:%s', cOpts.host, cOpts.port)
43+
const err = errcode(new Error(`Timeout after ${Date.now() - start}ms`), 'ETIMEDOUT')
44+
// Note: this will result in onError() being called
45+
rawSocket.emit('error', err)
46+
}
47+
48+
const onConnect = () => {
49+
log('Connected to %s:%s', cOpts.host, cOpts.port)
50+
done(null, rawSocket)
51+
}
4252

43-
const socket = toPull.duplex(rawSocket)
53+
const onAbort = () => {
54+
log('Dial to %s:%s aborted', cOpts.host, cOpts.port)
55+
done(new AbortError())
56+
}
4457

45-
const conn = new Connection(socket)
58+
const done = (err, res) => {
59+
rawSocket.removeListener('error', onError)
60+
rawSocket.removeListener('timeout', onTimeout)
61+
rawSocket.removeListener('connect', onConnect)
62+
options.signal && options.signal.removeEventListener('abort', onAbort)
4663

47-
conn.getObservedAddrs = (callback) => {
48-
return callback(null, [ma])
49-
}
64+
err ? reject(err) : resolve(res)
65+
}
5066

51-
return conn
67+
rawSocket.once('error', onError)
68+
rawSocket.once('timeout', onTimeout)
69+
rawSocket.once('connect', onConnect)
70+
rawSocket.on('close', () => rawSocket.destroy())
71+
options.signal && options.signal.addEventListener('abort', onAbort)
72+
})
5273
}
5374

5475
createListener (options, handler) {

0 commit comments

Comments
 (0)