This repository was archived by the owner on Aug 29, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathindex.js
141 lines (123 loc) · 4.14 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
'use strict'
const net = require('net')
const mafmt = require('mafmt')
const withIs = require('class-is')
const errCode = require('err-code')
const log = require('debug')('libp2p:tcp')
const toConnection = require('./socket-to-conn')
const createListener = require('./listener')
const { multiaddrToNetConfig } = require('./utils')
const { AbortError } = require('abortable-iterator')
const { CODE_CIRCUIT, CODE_P2P } = require('./constants')
/**
* @class TCP
*/
class TCP {
/**
* @constructor
* @param {object} options
* @param {Upgrader} options.upgrader
*/
constructor ({ upgrader }) {
if (!upgrader) {
throw new Error('An upgrader must be provided. See https://github.com/libp2p/interface-transport#upgrader.')
}
this._upgrader = upgrader
}
/**
* @async
* @param {Multiaddr} ma
* @param {object} options
* @param {AbortSignal} options.signal Used to abort dial requests
* @returns {Connection} An upgraded Connection
*/
async dial (ma, options) {
options = options || {}
const socket = await this._connect(ma, options)
const maConn = toConnection(socket, { remoteAddr: ma, signal: options.signal })
log('new outbound connection %s', maConn.remoteAddr)
const conn = await this._upgrader.upgradeOutbound(maConn)
log('outbound connection %s upgraded', maConn.remoteAddr)
return conn
}
/**
* @private
* @param {Multiaddr} ma
* @param {object} options
* @param {AbortSignal} options.signal Used to abort dial requests
* @returns {Promise<Socket>} Resolves a TCP Socket
*/
_connect (ma, options = {}) {
if (options.signal && options.signal.aborted) {
throw new AbortError()
}
return new Promise((resolve, reject) => {
const start = Date.now()
const cOpts = multiaddrToNetConfig(ma)
log('dialing %j', cOpts)
const rawSocket = net.connect(cOpts)
const onError = err => {
err.message = `connection error ${cOpts.host}:${cOpts.port}: ${err.message}`
done(err)
}
const onTimeout = () => {
log('connnection timeout %s:%s', cOpts.host, cOpts.port)
const err = errCode(new Error(`connection timeout after ${Date.now() - start}ms`), 'ERR_CONNECT_TIMEOUT')
// Note: this will result in onError() being called
rawSocket.emit('error', err)
}
const onConnect = () => {
log('connection opened %j', cOpts)
done()
}
const onAbort = () => {
log('connection aborted %j', cOpts)
rawSocket.destroy()
done(new AbortError())
}
const done = err => {
rawSocket.removeListener('error', onError)
rawSocket.removeListener('timeout', onTimeout)
rawSocket.removeListener('connect', onConnect)
options.signal && options.signal.removeEventListener('abort', onAbort)
if (err) return reject(err)
resolve(rawSocket)
}
rawSocket.on('error', onError)
rawSocket.on('timeout', onTimeout)
rawSocket.on('connect', onConnect)
options.signal && options.signal.addEventListener('abort', onAbort)
})
}
/**
* Creates a TCP listener. The provided `handler` function will be called
* anytime a new incoming Connection has been successfully upgraded via
* `upgrader.upgradeInbound`.
* @param {*} [options]
* @param {function(Connection)} handler
* @returns {Listener} A TCP listener
*/
createListener (options, handler) {
if (typeof options === 'function') {
handler = options
options = {}
}
options = options || {}
return createListener({ handler, upgrader: this._upgrader }, options)
}
/**
* Takes a list of `Multiaddr`s and returns only valid TCP addresses
* @param {Multiaddr[]} multiaddrs
* @returns {Multiaddr[]} Valid TCP multiaddrs
*/
filter (multiaddrs) {
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]
return multiaddrs.filter(ma => {
if (ma.protoCodes().includes(CODE_CIRCUIT)) {
return false
}
return mafmt.TCP.matches(ma.decapsulateCode(CODE_P2P))
})
}
}
module.exports = withIs(TCP, { className: 'TCP', symbolName: '@libp2p/js-libp2p-tcp/tcp' })