Skip to content
This repository was archived by the owner on Aug 29, 2023. It is now read-only.

fix: destroy sockets on close #204

Merged
merged 3 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ const upgrader = {
upgradeOutbound: maConn => maConn
}

const tcp = new TCP({ upgrader })
const tcp = new TCP()

const listener = tcp.createListener({
upgrader,
handler: (socket) => {
console.log('new connection opened')
pipe(
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
"aegir": "^37.0.4",
"it-all": "^1.0.6",
"it-pipe": "^2.0.3",
"p-defer": "^4.0.0",
"sinon": "^14.0.0",
"uint8arrays": "^3.0.0"
}
Expand Down
3 changes: 3 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ export const CODE_CIRCUIT = 290

// Time to wait for a connection to close gracefully before destroying it manually
export const CLOSE_TIMEOUT = 2000

// Close the socket if there is no activity after this long in ms
export const SOCKET_TIMEOUT = 30000
36 changes: 34 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,30 @@ import type { Connection } from '@libp2p/interface-connection'

const log = logger('libp2p:tcp')

export interface TCPOptions {
/**
* An optional number in ms that is used as an inactivity timeout after which the socket will be closed
*/
inboundSocketInactivityTimeout?: number

/**
* An optional number in ms that is used as an inactivity timeout after which the socket will be closed
*/
outboundSocketInactivityTimeout?: number

/**
* When closing a socket, wait this long for it to close gracefully before it is closed more forcibly
*/
socketCloseTimeout?: number
}

export class TCP implements Transport {
private readonly opts: TCPOptions

constructor (options: TCPOptions = {}) {
this.opts = options
}

get [symbol] (): true {
return true
}
Expand All @@ -32,7 +55,12 @@ export class TCP implements Transport {
log('socket error', err)
})

const maConn = toMultiaddrConnection(socket, { remoteAddr: ma, signal: options.signal })
const maConn = toMultiaddrConnection(socket, {
remoteAddr: ma,
signal: options.signal,
socketInactivityTimeout: this.opts.outboundSocketInactivityTimeout,
socketCloseTimeout: this.opts.socketCloseTimeout
})
log('new outbound connection %s', maConn.remoteAddr)
const conn = await options.upgrader.upgradeOutbound(maConn)
log('outbound connection %s upgraded', maConn.remoteAddr)
Expand Down Expand Up @@ -108,7 +136,11 @@ export class TCP implements Transport {
* `upgrader.upgradeInbound`.
*/
createListener (options: CreateListenerOptions) {
return createListener(options)
return createListener({
...options,
socketInactivityTimeout: this.opts.inboundSocketInactivityTimeout,
socketCloseTimeout: this.opts.socketCloseTimeout
})
}

/**
Expand Down
14 changes: 10 additions & 4 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ async function attemptClose (maConn: MultiaddrConnection) {
interface Context {
handler?: (conn: Connection) => void
upgrader: Upgrader
socketInactivityTimeout?: number
socketCloseTimeout?: number
}

/**
* Create listener
*/
export function createListener (context: Context) {
const {
handler, upgrader
handler, upgrader, socketInactivityTimeout, socketCloseTimeout
} = context

let peerId: string | null
Expand All @@ -53,7 +55,11 @@ export function createListener (context: Context) {

let maConn: MultiaddrConnection
try {
maConn = toMultiaddrConnection(socket, { listeningAddr })
maConn = toMultiaddrConnection(socket, {
listeningAddr,
socketInactivityTimeout,
socketCloseTimeout
})
} catch (err) {
log.error('inbound connection failed', err)
return
Expand Down Expand Up @@ -139,9 +145,9 @@ export function createListener (context: Context) {
return
}

await Promise.all([
await Promise.all(
server.__connections.map(async maConn => await attemptClose(maConn))
])
)

await new Promise<void>((resolve, reject) => {
server.close(err => (err != null) ? reject(err) : resolve())
Expand Down
130 changes: 91 additions & 39 deletions src/socket-to-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import { logger } from '@libp2p/logger'
// @ts-expect-error no types
import toIterable from 'stream-to-it'
import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr'
import { CLOSE_TIMEOUT } from './constants.js'
import { CLOSE_TIMEOUT, SOCKET_TIMEOUT } from './constants.js'
import errCode from 'err-code'
import type { Socket } from 'net'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { MultiaddrConnection } from '@libp2p/interface-connection'
Expand All @@ -15,6 +16,8 @@ interface ToConnectionOptions {
remoteAddr?: Multiaddr
localAddr?: Multiaddr
signal?: AbortSignal
socketInactivityTimeout?: number
socketCloseTimeout?: number
}

/**
Expand All @@ -23,6 +26,8 @@ interface ToConnectionOptions {
*/
export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOptions) => {
options = options ?? {}
const inactivityTimeout = options.socketInactivityTimeout ?? SOCKET_TIMEOUT
const closeTimeout = options.socketCloseTimeout ?? CLOSE_TIMEOUT

// Check if we are connected on a unix path
if (options.listeningAddr?.getPath() != null) {
Expand All @@ -33,22 +38,51 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
options.localAddr = options.remoteAddr
}

const remoteAddr = options.remoteAddr ?? toMultiaddr(socket.remoteAddress ?? '', socket.remotePort ?? '')
const { host, port } = remoteAddr.toOptions()
const { sink, source } = toIterable.duplex(socket)

// by default there is no timeout
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback
socket.setTimeout(inactivityTimeout, () => {
log('%s:%s socket read timeout', host, port)

// only destroy with an error if the remote has not sent the FIN message
let err: Error | undefined
if (socket.readable) {
err = errCode(new Error('Socket read timeout'), 'ERR_SOCKET_READ_TIMEOUT')
}

// if the socket times out due to inactivity we must manually close the connection
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-timeout
socket.destroy(err)
})

socket.once('close', () => {
log('%s:%s socket closed', host, port)

// In instances where `close` was not explicitly called,
// such as an iterable stream ending, ensure we have set the close
// timeline
if (maConn.timeline.close == null) {
maConn.timeline.close = Date.now()
}
})

socket.once('end', () => {
// the remote sent a FIN packet which means no more data will be sent
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-end
log('socket ended', maConn.remoteAddr.toString())
})

const maConn: MultiaddrConnection = {
async sink (source) {
if ((options?.signal) != null) {
source = abortableSource(source, options.signal)
}

try {
await sink((async function * () {
for await (const chunk of source) {
// Convert BufferList to Buffer
// Sink in StreamMuxer define argument as Uint8Array so chunk type infers as number which can't be sliced
yield Buffer.isBuffer(chunk) ? chunk : chunk.slice()
}
})())
await sink(source)
} catch (err: any) {
// If aborted we can safely ignore
if (err.type !== 'aborted') {
Expand All @@ -58,66 +92,84 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
log(err)
}
}

// we have finished writing, send the FIN message
socket.end()
},

// Missing Type for "abortable"
source: (options.signal != null) ? abortableSource(source, options.signal) : source,

// If the remote address was passed, use it - it may have the peer ID encapsulated
remoteAddr: options.remoteAddr ?? toMultiaddr(socket.remoteAddress ?? '', socket.remotePort ?? ''),
remoteAddr,

timeline: { open: Date.now() },

async close () {
if (socket.destroyed) return
if (socket.destroyed) {
log('%s:%s socket was already destroyed when trying to close', host, port)
return
}

return await new Promise((resolve, reject) => {
log('%s:%s closing socket', host, port)
await new Promise<void>((resolve, reject) => {
const start = Date.now()

// Attempt to end the socket. If it takes longer to close than the
// timeout, destroy it manually.
const timeout = setTimeout(() => {
const { host, port } = maConn.remoteAddr.toOptions()
log(
'timeout closing socket to %s:%s after %dms, destroying it manually',
host,
port,
Date.now() - start
)

if (socket.destroyed) {
log('%s:%s is already destroyed', host, port)
resolve()
} else {
socket.destroy()
}
log('%s:%s socket close timeout after %dms, destroying it manually', host, port, Date.now() - start)

resolve()
}, CLOSE_TIMEOUT).unref()
// will trigger 'error' and 'close' events that resolves promise
socket.destroy(errCode(new Error('Socket close timeout'), 'ERR_SOCKET_CLOSE_TIMEOUT'))
}
}, closeTimeout).unref()

socket.once('close', () => {
log('%s:%s socket closed', host, port)
// socket completely closed
clearTimeout(timeout)
resolve()
})
socket.end((err?: Error & { code?: string }) => {
clearTimeout(timeout)
maConn.timeline.close = Date.now()
if (err != null) {
return reject(err)
socket.once('error', (err: Error) => {
log('%s:%s socket error', host, port, err)

// error closing socket
if (maConn.timeline.close == null) {
maConn.timeline.close = Date.now()
}
resolve()

if (socket.destroyed) {
clearTimeout(timeout)
}

reject(err)
})

// shorten inactivity timeout
socket.setTimeout(closeTimeout)

// close writable end of the socket
socket.end()

if (socket.writableLength > 0) {
// there are outgoing bytes waiting to be sent
socket.once('drain', () => {
log('%s:%s socket drained', host, port)

// all bytes have been sent we can destroy the socket (maybe) before the timeout
socket.destroy()
})
} else {
// nothing to send, destroy immediately
socket.destroy()
}
})
}
}

socket.once('close', () => {
// In instances where `close` was not explicitly called,
// such as an iterable stream ending, ensure we have set the close
// timeline
if (maConn.timeline.close == null) {
maConn.timeline.close = Date.now()
}
})

return maConn
}
Loading