This repository was archived by the owner on Aug 29, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathlistener.js
153 lines (124 loc) · 4.1 KB
/
listener.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
'use strict'
const multiaddr = require('multiaddr')
const Connection = require('interface-connection').Connection
const os = require('os')
const net = require('net')
const toPull = require('stream-to-pull-stream')
const EventEmitter = require('events').EventEmitter
const debug = require('debug')
const log = debug('libp2p:tcp:listen')
const getMultiaddr = require('./get-multiaddr')
const CLOSE_TIMEOUT = 2000
function noop () {}
module.exports = (handler) => {
const listener = new EventEmitter()
const server = net.createServer((socket) => {
// Avoid uncaught errors cause by unstable connections
socket.on('error', noop)
const addr = getMultiaddr(socket)
if (!addr) {
if (socket.remoteAddress === undefined) {
log('connection closed before p2p connection made')
} else {
log('error interpreting incoming p2p connection')
}
return
}
log('new connection', addr.toString())
const s = toPull.duplex(socket)
s.getObservedAddrs = (cb) => {
cb(null, [addr])
}
trackSocket(server, socket)
const conn = new Connection(s)
handler(conn)
listener.emit('connection', conn)
})
server.on('listening', () => listener.emit('listening'))
server.on('error', (err) => listener.emit('error', err))
server.on('close', () => listener.emit('close'))
// Keep track of open connections to destroy in case of timeout
server.__connections = {}
listener.close = (options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}
callback = callback || noop
options = options || {}
const timeout = setTimeout(() => {
log('unable to close graciously, destroying conns')
Object.keys(server.__connections).forEach((key) => {
log('destroying %s', key)
server.__connections[key].destroy()
})
}, options.timeout || CLOSE_TIMEOUT)
server.close(callback)
server.once('close', () => {
clearTimeout(timeout)
})
}
let listeningAddr
listener.listen = (ma, callback) => {
listeningAddr = ma
const lOpts = listeningAddr.toOptions()
log('Listening on %s %s', lOpts.port, lOpts.host)
return server.listen(lOpts.port, lOpts.host, callback)
}
listener.getAddrs = (callback) => {
const multiaddrs = []
const address = server.address()
if (!address) {
return callback(new Error('Listener is not ready yet'))
}
// Because TCP will only return the IPv6 version
// we need to capture from the passed multiaddr
if (listeningAddr.toString().indexOf('ip4') !== -1) {
if (listeningAddr.toString().indexOf('0.0.0.0') !== -1) {
const netInterfaces = os.networkInterfaces()
Object.keys(netInterfaces).forEach((niKey) => {
netInterfaces[niKey].forEach((ni) => {
if (ni.family === 'IPv4') {
multiaddrs.push(
multiaddr(listeningAddr.toString().replace('0.0.0.0', ni.address))
)
}
})
})
} else {
multiaddrs.push(listeningAddr)
}
}
if (address.family === 'IPv6') {
// Listen on all available addresses when using wildcard
if (listeningAddr.toString().indexOf('/::/') !== -1) {
const netInterfaces = os.networkInterfaces()
Object.keys(netInterfaces).forEach((niKey) => {
netInterfaces[niKey].forEach((ni) => {
if (ni.family === address.family) {
const maOpts = listeningAddr.toOptions()
if (maOpts.host === '::') {
maOpts.family = address.family
maOpts.address = ni.address
multiaddrs.push(
multiaddr.fromNodeAddress(maOpts, maOpts.transport)
)
}
}
})
})
} else {
multiaddrs.push(listeningAddr)
}
}
callback(null, multiaddrs)
}
return listener
}
function trackSocket (server, socket) {
const key = `${socket.remoteAddress}:${socket.remotePort}`
server.__connections[key] = socket
socket.on('close', () => {
delete server.__connections[key]
})
}