Skip to content
This repository was archived by the owner on Aug 29, 2023. It is now read-only.

Commit e8b8f2e

Browse files
authored
fix: destroy sockets on close (#204)
* fix: destroy sockets on close We call `.end` on a socket and wait for the `close` event, but calling `.end` only closes the writable end of the socket. To close both ends we either need to wait for the remote to close their writable end or we need to `.destroy` our socket. If we call `.destroy` all data is lost and no more I/O occurs. The change here is to call `.end` then check to see if we have any outgoing writes, if we do, wait for the `drain` event which means the outgoing data has been sent, then call `.destroy`, otherwise call `.destroy` immediately. At the same time use a timer to call `.destroy` if the `drain` event never arrives. It also set up the `timeout` event for the socket to allow closing the socket after a period of inactivity. Three new constructor options are added to control the behvaiour: - `inboundSocketInactivityTimeout` the socket will be closed after this many ms of not sending/recieving data (default 30s) - `outboundSocketInactivityTimeout` the socket will be closed after this many ms of not sending/recieving data (default 30s) - `socketCloseTimeout` how long to wait for the `drain` event (default 2s) Fixes #201 * chore: account for networking differences * chore: ignore client errors
1 parent 3ac4be4 commit e8b8f2e

File tree

7 files changed

+570
-46
lines changed

7 files changed

+570
-46
lines changed

README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,10 @@ const upgrader = {
4747
upgradeOutbound: maConn => maConn
4848
}
4949

50-
const tcp = new TCP({ upgrader })
50+
const tcp = new TCP()
5151

5252
const listener = tcp.createListener({
53+
upgrader,
5354
handler: (socket) => {
5455
console.log('new connection opened')
5556
pipe(

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@
153153
"aegir": "^37.0.4",
154154
"it-all": "^1.0.6",
155155
"it-pipe": "^2.0.3",
156+
"p-defer": "^4.0.0",
156157
"sinon": "^14.0.0",
157158
"uint8arrays": "^3.0.0"
158159
}

src/constants.ts

+3
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,6 @@ export const CODE_CIRCUIT = 290
44

55
// Time to wait for a connection to close gracefully before destroying it manually
66
export const CLOSE_TIMEOUT = 2000
7+
8+
// Close the socket if there is no activity after this long in ms
9+
export const SOCKET_TIMEOUT = 30000

src/index.ts

+34-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,30 @@ import type { Connection } from '@libp2p/interface-connection'
1515

1616
const log = logger('libp2p:tcp')
1717

18+
export interface TCPOptions {
19+
/**
20+
* An optional number in ms that is used as an inactivity timeout after which the socket will be closed
21+
*/
22+
inboundSocketInactivityTimeout?: number
23+
24+
/**
25+
* An optional number in ms that is used as an inactivity timeout after which the socket will be closed
26+
*/
27+
outboundSocketInactivityTimeout?: number
28+
29+
/**
30+
* When closing a socket, wait this long for it to close gracefully before it is closed more forcibly
31+
*/
32+
socketCloseTimeout?: number
33+
}
34+
1835
export class TCP implements Transport {
36+
private readonly opts: TCPOptions
37+
38+
constructor (options: TCPOptions = {}) {
39+
this.opts = options
40+
}
41+
1942
get [symbol] (): true {
2043
return true
2144
}
@@ -32,7 +55,12 @@ export class TCP implements Transport {
3255
log('socket error', err)
3356
})
3457

35-
const maConn = toMultiaddrConnection(socket, { remoteAddr: ma, signal: options.signal })
58+
const maConn = toMultiaddrConnection(socket, {
59+
remoteAddr: ma,
60+
signal: options.signal,
61+
socketInactivityTimeout: this.opts.outboundSocketInactivityTimeout,
62+
socketCloseTimeout: this.opts.socketCloseTimeout
63+
})
3664
log('new outbound connection %s', maConn.remoteAddr)
3765
const conn = await options.upgrader.upgradeOutbound(maConn)
3866
log('outbound connection %s upgraded', maConn.remoteAddr)
@@ -108,7 +136,11 @@ export class TCP implements Transport {
108136
* `upgrader.upgradeInbound`.
109137
*/
110138
createListener (options: CreateListenerOptions) {
111-
return createListener(options)
139+
return createListener({
140+
...options,
141+
socketInactivityTimeout: this.opts.inboundSocketInactivityTimeout,
142+
socketCloseTimeout: this.opts.socketCloseTimeout
143+
})
112144
}
113145

114146
/**

src/listener.ts

+10-4
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,16 @@ async function attemptClose (maConn: MultiaddrConnection) {
3232
interface Context {
3333
handler?: (conn: Connection) => void
3434
upgrader: Upgrader
35+
socketInactivityTimeout?: number
36+
socketCloseTimeout?: number
3537
}
3638

3739
/**
3840
* Create listener
3941
*/
4042
export function createListener (context: Context) {
4143
const {
42-
handler, upgrader
44+
handler, upgrader, socketInactivityTimeout, socketCloseTimeout
4345
} = context
4446

4547
let peerId: string | null
@@ -53,7 +55,11 @@ export function createListener (context: Context) {
5355

5456
let maConn: MultiaddrConnection
5557
try {
56-
maConn = toMultiaddrConnection(socket, { listeningAddr })
58+
maConn = toMultiaddrConnection(socket, {
59+
listeningAddr,
60+
socketInactivityTimeout,
61+
socketCloseTimeout
62+
})
5763
} catch (err) {
5864
log.error('inbound connection failed', err)
5965
return
@@ -139,9 +145,9 @@ export function createListener (context: Context) {
139145
return
140146
}
141147

142-
await Promise.all([
148+
await Promise.all(
143149
server.__connections.map(async maConn => await attemptClose(maConn))
144-
])
150+
)
145151

146152
await new Promise<void>((resolve, reject) => {
147153
server.close(err => (err != null) ? reject(err) : resolve())

src/socket-to-conn.ts

+91-39
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ import { logger } from '@libp2p/logger'
33
// @ts-expect-error no types
44
import toIterable from 'stream-to-it'
55
import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr'
6-
import { CLOSE_TIMEOUT } from './constants.js'
6+
import { CLOSE_TIMEOUT, SOCKET_TIMEOUT } from './constants.js'
7+
import errCode from 'err-code'
78
import type { Socket } from 'net'
89
import type { Multiaddr } from '@multiformats/multiaddr'
910
import type { MultiaddrConnection } from '@libp2p/interface-connection'
@@ -15,6 +16,8 @@ interface ToConnectionOptions {
1516
remoteAddr?: Multiaddr
1617
localAddr?: Multiaddr
1718
signal?: AbortSignal
19+
socketInactivityTimeout?: number
20+
socketCloseTimeout?: number
1821
}
1922

2023
/**
@@ -23,6 +26,8 @@ interface ToConnectionOptions {
2326
*/
2427
export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOptions) => {
2528
options = options ?? {}
29+
const inactivityTimeout = options.socketInactivityTimeout ?? SOCKET_TIMEOUT
30+
const closeTimeout = options.socketCloseTimeout ?? CLOSE_TIMEOUT
2631

2732
// Check if we are connected on a unix path
2833
if (options.listeningAddr?.getPath() != null) {
@@ -33,22 +38,51 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
3338
options.localAddr = options.remoteAddr
3439
}
3540

41+
const remoteAddr = options.remoteAddr ?? toMultiaddr(socket.remoteAddress ?? '', socket.remotePort ?? '')
42+
const { host, port } = remoteAddr.toOptions()
3643
const { sink, source } = toIterable.duplex(socket)
3744

45+
// by default there is no timeout
46+
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback
47+
socket.setTimeout(inactivityTimeout, () => {
48+
log('%s:%s socket read timeout', host, port)
49+
50+
// only destroy with an error if the remote has not sent the FIN message
51+
let err: Error | undefined
52+
if (socket.readable) {
53+
err = errCode(new Error('Socket read timeout'), 'ERR_SOCKET_READ_TIMEOUT')
54+
}
55+
56+
// if the socket times out due to inactivity we must manually close the connection
57+
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-timeout
58+
socket.destroy(err)
59+
})
60+
61+
socket.once('close', () => {
62+
log('%s:%s socket closed', host, port)
63+
64+
// In instances where `close` was not explicitly called,
65+
// such as an iterable stream ending, ensure we have set the close
66+
// timeline
67+
if (maConn.timeline.close == null) {
68+
maConn.timeline.close = Date.now()
69+
}
70+
})
71+
72+
socket.once('end', () => {
73+
// the remote sent a FIN packet which means no more data will be sent
74+
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-end
75+
log('socket ended', maConn.remoteAddr.toString())
76+
})
77+
3878
const maConn: MultiaddrConnection = {
3979
async sink (source) {
4080
if ((options?.signal) != null) {
4181
source = abortableSource(source, options.signal)
4282
}
4383

4484
try {
45-
await sink((async function * () {
46-
for await (const chunk of source) {
47-
// Convert BufferList to Buffer
48-
// Sink in StreamMuxer define argument as Uint8Array so chunk type infers as number which can't be sliced
49-
yield Buffer.isBuffer(chunk) ? chunk : chunk.slice()
50-
}
51-
})())
85+
await sink(source)
5286
} catch (err: any) {
5387
// If aborted we can safely ignore
5488
if (err.type !== 'aborted') {
@@ -58,66 +92,84 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
5892
log(err)
5993
}
6094
}
95+
96+
// we have finished writing, send the FIN message
97+
socket.end()
6198
},
6299

63-
// Missing Type for "abortable"
64100
source: (options.signal != null) ? abortableSource(source, options.signal) : source,
65101

66102
// If the remote address was passed, use it - it may have the peer ID encapsulated
67-
remoteAddr: options.remoteAddr ?? toMultiaddr(socket.remoteAddress ?? '', socket.remotePort ?? ''),
103+
remoteAddr,
68104

69105
timeline: { open: Date.now() },
70106

71107
async close () {
72-
if (socket.destroyed) return
108+
if (socket.destroyed) {
109+
log('%s:%s socket was already destroyed when trying to close', host, port)
110+
return
111+
}
73112

74-
return await new Promise((resolve, reject) => {
113+
log('%s:%s closing socket', host, port)
114+
await new Promise<void>((resolve, reject) => {
75115
const start = Date.now()
76116

77117
// Attempt to end the socket. If it takes longer to close than the
78118
// timeout, destroy it manually.
79119
const timeout = setTimeout(() => {
80-
const { host, port } = maConn.remoteAddr.toOptions()
81-
log(
82-
'timeout closing socket to %s:%s after %dms, destroying it manually',
83-
host,
84-
port,
85-
Date.now() - start
86-
)
87-
88120
if (socket.destroyed) {
89121
log('%s:%s is already destroyed', host, port)
122+
resolve()
90123
} else {
91-
socket.destroy()
92-
}
124+
log('%s:%s socket close timeout after %dms, destroying it manually', host, port, Date.now() - start)
93125

94-
resolve()
95-
}, CLOSE_TIMEOUT).unref()
126+
// will trigger 'error' and 'close' events that resolves promise
127+
socket.destroy(errCode(new Error('Socket close timeout'), 'ERR_SOCKET_CLOSE_TIMEOUT'))
128+
}
129+
}, closeTimeout).unref()
96130

97131
socket.once('close', () => {
132+
log('%s:%s socket closed', host, port)
133+
// socket completely closed
98134
clearTimeout(timeout)
99135
resolve()
100136
})
101-
socket.end((err?: Error & { code?: string }) => {
102-
clearTimeout(timeout)
103-
maConn.timeline.close = Date.now()
104-
if (err != null) {
105-
return reject(err)
137+
socket.once('error', (err: Error) => {
138+
log('%s:%s socket error', host, port, err)
139+
140+
// error closing socket
141+
if (maConn.timeline.close == null) {
142+
maConn.timeline.close = Date.now()
106143
}
107-
resolve()
144+
145+
if (socket.destroyed) {
146+
clearTimeout(timeout)
147+
}
148+
149+
reject(err)
108150
})
151+
152+
// shorten inactivity timeout
153+
socket.setTimeout(closeTimeout)
154+
155+
// close writable end of the socket
156+
socket.end()
157+
158+
if (socket.writableLength > 0) {
159+
// there are outgoing bytes waiting to be sent
160+
socket.once('drain', () => {
161+
log('%s:%s socket drained', host, port)
162+
163+
// all bytes have been sent we can destroy the socket (maybe) before the timeout
164+
socket.destroy()
165+
})
166+
} else {
167+
// nothing to send, destroy immediately
168+
socket.destroy()
169+
}
109170
})
110171
}
111172
}
112173

113-
socket.once('close', () => {
114-
// In instances where `close` was not explicitly called,
115-
// such as an iterable stream ending, ensure we have set the close
116-
// timeline
117-
if (maConn.timeline.close == null) {
118-
maConn.timeline.close = Date.now()
119-
}
120-
})
121-
122174
return maConn
123175
}

0 commit comments

Comments
 (0)