forked from libp2p/js-libp2p-tcp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.ts
184 lines (147 loc) · 5.26 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import net from 'net'
import * as mafmt from '@multiformats/mafmt'
import errCode from 'err-code'
import { logger } from '@libp2p/logger'
import { toMultiaddrConnection } from './socket-to-conn.js'
import { TCPListener } from './listener.js'
import { multiaddrToNetConfig } from './utils.js'
import { AbortError } from '@libp2p/interfaces/errors'
import { CODE_CIRCUIT, CODE_P2P, CODE_UNIX } from './constants.js'
import { CreateListenerOptions, DialOptions, Listener, symbol, Transport } from '@libp2p/interface-transport'
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr'
import type { Socket, IpcSocketConnectOpts, TcpSocketConnectOpts } from 'net'
import type { Connection } from '@libp2p/interface-connection'
const log = logger('libp2p:tcp')
export interface TCPOptions {
/**
* An optional number in ms that is used as an inactivity timeout after which the socket will be closed
*/
inboundSocketInactivityTimeout?: number
/**
* An optional number in ms that is used as an inactivity timeout after which the socket will be closed
*/
outboundSocketInactivityTimeout?: number
/**
* When closing a socket, wait this long for it to close gracefully before it is closed more forcibly
*/
socketCloseTimeout?: number
}
/**
* Expose a subset of net.connect options
*/
export interface TCPSocketOptions extends AbortOptions {
noDelay?: boolean
keepAlive?: boolean
keepAliveInitialDelay?: number
allowHalfOpen?: boolean
}
export interface TCPDialOptions extends DialOptions, TCPSocketOptions {
}
export interface TCPCreateListenerOptions extends CreateListenerOptions, TCPSocketOptions {
}
export class TCP implements Transport {
private readonly opts: TCPOptions
constructor (options: TCPOptions = {}) {
this.opts = options
}
get [symbol] (): true {
return true
}
get [Symbol.toStringTag] () {
return '@libp2p/tcp'
}
async dial (ma: Multiaddr, options: TCPDialOptions): Promise<Connection> {
options.keepAlive = options.keepAlive ?? true
const socket = await this._connect(ma, options)
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
log('socket error', err)
})
const maConn = toMultiaddrConnection(socket, {
remoteAddr: ma,
signal: options.signal,
socketInactivityTimeout: this.opts.outboundSocketInactivityTimeout,
socketCloseTimeout: this.opts.socketCloseTimeout
})
log('new outbound connection %s', maConn.remoteAddr)
const conn = await options.upgrader.upgradeOutbound(maConn)
log('outbound connection %s upgraded', maConn.remoteAddr)
return conn
}
async _connect (ma: Multiaddr, options: TCPDialOptions) {
if (options.signal?.aborted === true) {
throw new AbortError()
}
return await new Promise<Socket>((resolve, reject) => {
const start = Date.now()
const cOpts = multiaddrToNetConfig(ma) as (IpcSocketConnectOpts & TcpSocketConnectOpts)
const cOptsStr = cOpts.path ?? `${cOpts.host ?? ''}:${cOpts.port}`
log('dialing %j', cOpts)
const rawSocket = net.connect(cOpts)
const onError = (err: Error) => {
err.message = `connection error ${cOptsStr}: ${err.message}`
done(err)
}
const onTimeout = () => {
log('connection timeout %s', cOptsStr)
const err = errCode(new Error(`connection timeout after ${Date.now() - start}ms`), 'ERR_CONNECT_TIMEOUT')
// Note: this will result in onError() being called
rawSocket.emit('error', err)
}
const onConnect = () => {
log('connection opened %j', cOpts)
done()
}
const onAbort = () => {
log('connection aborted %j', cOpts)
rawSocket.destroy()
done(new AbortError())
}
const done = (err?: any) => {
rawSocket.removeListener('error', onError)
rawSocket.removeListener('timeout', onTimeout)
rawSocket.removeListener('connect', onConnect)
if (options.signal != null) {
options.signal.removeEventListener('abort', onAbort)
}
if (err != null) {
return reject(err)
}
resolve(rawSocket)
}
rawSocket.on('error', onError)
rawSocket.on('timeout', onTimeout)
rawSocket.on('connect', onConnect)
if (options.signal != null) {
options.signal.addEventListener('abort', onAbort)
}
})
}
/**
* Creates a TCP listener. The provided `handler` function will be called
* anytime a new incoming Connection has been successfully upgraded via
* `upgrader.upgradeInbound`.
*/
createListener (options: TCPCreateListenerOptions): Listener {
return new TCPListener({
...options,
socketInactivityTimeout: this.opts.inboundSocketInactivityTimeout,
socketCloseTimeout: this.opts.socketCloseTimeout
})
}
/**
* Takes a list of `Multiaddr`s and returns only valid TCP addresses
*/
filter (multiaddrs: Multiaddr[]): Multiaddr[] {
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]
return multiaddrs.filter(ma => {
if (ma.protoCodes().includes(CODE_CIRCUIT)) {
return false
}
if (ma.protoCodes().includes(CODE_UNIX)) {
return true
}
return mafmt.TCP.matches(ma.decapsulateCode(CODE_P2P))
})
}
}