|
1 | 1 | 'use strict'
|
2 | 2 |
|
| 3 | +const assert = require('assert') |
3 | 4 | const debug = require('debug')
|
4 | 5 | const log = debug('libp2p:utp')
|
5 | 6 | const errcode = require('err-code')
|
6 | 7 |
|
7 | 8 | const utp = require('utp-native')
|
8 | 9 | const mafmt = require('mafmt')
|
9 | 10 | const withIs = require('class-is')
|
10 |
| -const includes = require('lodash.includes') |
11 |
| -const isFunction = require('lodash.isfunction') |
12 |
| -const { AbortError } = require('interface-transport') |
13 |
| -const Libp2pSocket = require('./socket') |
14 |
| -const createListener = require('./create-listener.js') |
| 11 | +const { AbortError } = require('abortable-iterator') |
| 12 | + |
| 13 | +const { CODE_CIRCUIT, CODE_P2P, CLOSE_TIMEOUT } = require('./constants') |
| 14 | +const createListener = require('./listener.js') |
| 15 | +const toConnection = require('./socket-to-conn') |
15 | 16 |
|
16 | 17 | function noop () {}
|
17 | 18 |
|
| 19 | +/** |
| 20 | + * @class UTP |
| 21 | + */ |
18 | 22 | class UTP {
|
| 23 | + /** |
| 24 | + * @constructor |
| 25 | + * @param {object} options |
| 26 | + * @param {Upgrader} options.upgrader |
| 27 | + */ |
| 28 | + constructor ({ upgrader }) { |
| 29 | + assert(upgrader, 'An upgrader must be provided. See https://github.com/libp2p/interface-transport#upgrader.') |
| 30 | + this._upgrader = upgrader |
| 31 | + } |
| 32 | + |
| 33 | + /** |
| 34 | + * @async |
| 35 | + * @param {Multiaddr} ma |
| 36 | + * @param {object} options |
| 37 | + * @param {AbortSignal} options.signal Used to abort dial requests |
| 38 | + * @returns {Connection} An upgraded Connection |
| 39 | + */ |
19 | 40 | async dial (ma, options = {}) {
|
20 |
| - const rawSocket = await this._connect(ma, options) |
21 |
| - return new Libp2pSocket(rawSocket, ma, options) |
| 41 | + const rawConn = await this._connect(ma, options) |
| 42 | + const maConn = toConnection(rawConn, { remoteAddr: ma, signal: options.signal }) |
| 43 | + log('new outbound connection %s', maConn.remoteAddr) |
| 44 | + const conn = await this._upgrader.upgradeOutbound(maConn) |
| 45 | + log('outbound connection %s upgraded', maConn.remoteAddr) |
| 46 | + return conn |
22 | 47 | }
|
23 | 48 |
|
24 |
| - _connect (ma, options) { |
| 49 | + /** |
| 50 | + * @private |
| 51 | + * @param {Multiaddr} ma |
| 52 | + * @param {object} options |
| 53 | + * @param {AbortSignal} options.signal Used to abort dial requests |
| 54 | + * @returns {Promise<UTP>} Resolves a UTP Socket |
| 55 | + */ |
| 56 | + _connect (ma, options = {}) { |
| 57 | + if (options.signal && options.signal.aborted) { |
| 58 | + throw new AbortError() |
| 59 | + } |
| 60 | + |
25 | 61 | const cOpts = ma.toOptions()
|
26 |
| - log('Dialing %s:%s', cOpts.host, cOpts.port) |
| 62 | + log('dialing %s:%s', cOpts.host, cOpts.port) |
27 | 63 |
|
28 | 64 | return new Promise((resolve, reject) => {
|
29 |
| - if ((options.signal || {}).aborted) { |
30 |
| - return reject(new AbortError()) |
31 |
| - } |
32 |
| - |
33 | 65 | const start = Date.now()
|
34 |
| - const rawSocket = utp.connect(cOpts) |
| 66 | + const rawSocket = utp.connect(Number(cOpts.port), cOpts.host) |
35 | 67 |
|
36 | 68 | const onError = (err) => {
|
37 |
| - const msg = `Error dialing ${cOpts.host}:${cOpts.port}: ${err.message}` |
38 |
| - done(errcode(new Error(msg), err.code)) |
| 69 | + err.message = `connection error ${cOpts.host}:${cOpts.port}: ${err.message}` |
| 70 | + done(err) |
39 | 71 | }
|
40 | 72 |
|
41 | 73 | const onTimeout = () => {
|
42 |
| - log('Timeout dialing %s:%s', cOpts.host, cOpts.port) |
43 |
| - const err = errcode(new Error(`Timeout after ${Date.now() - start}ms`), 'ETIMEDOUT') |
| 74 | + log('connnection timeout %s:%s', cOpts.host, cOpts.port) |
| 75 | + const err = errcode(new Error(`connection timeout after ${Date.now() - start}ms`), 'ERR_CONNECT_TIMEOUT') |
44 | 76 | // Note: this will result in onError() being called
|
45 | 77 | rawSocket.emit('error', err)
|
46 | 78 | }
|
47 | 79 |
|
48 | 80 | const onConnect = () => {
|
49 |
| - log('Connected to %s:%s', cOpts.host, cOpts.port) |
50 |
| - done(null, rawSocket) |
| 81 | + log('connection opened %s:%s', cOpts.host, cOpts.port) |
| 82 | + done(null) |
51 | 83 | }
|
52 | 84 |
|
53 | 85 | const onAbort = () => {
|
54 |
| - log('Dial to %s:%s aborted', cOpts.host, cOpts.port) |
| 86 | + log('connection aborted %s:%s', cOpts.host, cOpts.port) |
55 | 87 | done(new AbortError())
|
56 | 88 | }
|
57 | 89 |
|
58 |
| - const done = (err, res) => { |
| 90 | + const done = (err) => { |
59 | 91 | rawSocket.removeListener('error', onError)
|
60 | 92 | rawSocket.removeListener('timeout', onTimeout)
|
61 | 93 | rawSocket.removeListener('connect', onConnect)
|
62 | 94 | options.signal && options.signal.removeEventListener('abort', onAbort)
|
63 | 95 |
|
64 |
| - err ? reject(err) : resolve(res) |
| 96 | + err ? reject(err) : resolve(rawSocket) |
65 | 97 | }
|
66 | 98 |
|
67 | 99 | rawSocket.once('error', onError)
|
68 |
| - rawSocket.once('timeout', onTimeout) |
69 | 100 | rawSocket.once('connect', onConnect)
|
70 |
| - rawSocket.on('close', () => rawSocket.destroy()) |
| 101 | + rawSocket.setTimeout(CLOSE_TIMEOUT, onTimeout) |
71 | 102 | options.signal && options.signal.addEventListener('abort', onAbort)
|
72 | 103 | })
|
73 | 104 | }
|
74 | 105 |
|
| 106 | + /** |
| 107 | + * Creates a UTP listener. The provided `handler` function will be called |
| 108 | + * anytime a new incoming Connection has been successfully upgraded via |
| 109 | + * `upgrader.upgradeInbound`. |
| 110 | + * @param {object} [options] |
| 111 | + * @param {function (Connection)} handler |
| 112 | + * @returns {Listener} A UTP listener |
| 113 | + */ |
75 | 114 | createListener (options, handler) {
|
76 |
| - if (isFunction(options)) { |
| 115 | + if (typeof options === 'function') { |
77 | 116 | handler = options
|
78 | 117 | options = {}
|
79 | 118 | }
|
80 | 119 |
|
81 | 120 | handler = handler || noop
|
82 | 121 |
|
83 |
| - return createListener(handler) |
| 122 | + return createListener({ handler, upgrader: this._upgrader }, options) |
84 | 123 | }
|
85 | 124 |
|
| 125 | + /** |
| 126 | + * Takes a list of `Multiaddr`s and returns only valid UTP addresses |
| 127 | + * @param {Multiaddr[]} multiaddrs |
| 128 | + * @returns {Multiaddr[]} Valid UTP multiaddrs |
| 129 | + */ |
86 | 130 | filter (multiaddrs) {
|
87 |
| - if (!Array.isArray(multiaddrs)) { |
88 |
| - multiaddrs = [multiaddrs] |
89 |
| - } |
| 131 | + multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs] |
90 | 132 |
|
91 | 133 | return multiaddrs.filter((ma) => {
|
92 |
| - if (includes(ma.protoNames(), 'p2p-circuit')) { |
| 134 | + if (ma.protoCodes().includes(CODE_CIRCUIT)) { |
93 | 135 | return false
|
94 | 136 | }
|
95 | 137 |
|
96 |
| - if (includes(ma.protoNames(), 'ipfs')) { |
97 |
| - ma = ma.decapsulate('ipfs') |
98 |
| - } |
99 |
| - |
100 |
| - return mafmt.UTP.matches(ma) |
| 138 | + return mafmt.UTP.matches(ma.decapsulateCode(CODE_P2P)) |
101 | 139 | })
|
102 | 140 | }
|
103 | 141 | }
|
|
0 commit comments