@@ -4,7 +4,8 @@ import { toMultiaddrConnection } from './socket-to-conn.js'
4
4
import { CODE_P2P } from './constants.js'
5
5
import {
6
6
getMultiaddrs ,
7
- multiaddrToNetConfig
7
+ multiaddrToNetConfig ,
8
+ NetConfig
8
9
} from './utils.js'
9
10
import { EventEmitter , CustomEvent } from '@libp2p/interfaces/events'
10
11
import type { MultiaddrConnection , Connection } from '@libp2p/interface-connection'
@@ -25,15 +26,26 @@ async function attemptClose (maConn: MultiaddrConnection) {
25
26
}
26
27
}
27
28
29
+ export interface LimitServerConnectionsOpts {
30
+ acceptBelow : number
31
+ rejectAbove : number
32
+ }
33
+
28
34
interface Context extends TCPCreateListenerOptions {
29
35
handler ?: ( conn : Connection ) => void
30
36
upgrader : Upgrader
31
37
socketInactivityTimeout ?: number
32
38
socketCloseTimeout ?: number
33
39
maxConnections ?: number
40
+ limitServerConnections ?: LimitServerConnectionsOpts
34
41
}
35
42
36
- type Status = { started : false } | { started : true , listeningAddr : Multiaddr , peerId : string | null }
43
+ type Status = { started : false } | {
44
+ started : true
45
+ listeningAddr : Multiaddr
46
+ peerId : string | null
47
+ netConfig : NetConfig
48
+ }
37
49
38
50
export class TCPListener extends EventEmitter < ListenerEvents > implements Listener {
39
51
private readonly server : net . Server
@@ -89,12 +101,30 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
89
101
90
102
socket . once ( 'close' , ( ) => {
91
103
this . connections . delete ( maConn )
104
+
105
+ if (
106
+ this . context . limitServerConnections != null &&
107
+ this . connections . size < this . context . limitServerConnections . acceptBelow
108
+ ) {
109
+ this . netListen ( ) . catch ( e => {
110
+ log . error ( 'error attempting to listen server once connection count under limit' , e )
111
+ } )
112
+ }
92
113
} )
93
114
94
115
if ( this . context . handler != null ) {
95
116
this . context . handler ( conn )
96
117
}
97
118
119
+ if (
120
+ this . context . limitServerConnections != null &&
121
+ this . connections . size >= this . context . limitServerConnections . rejectAbove
122
+ ) {
123
+ this . netClose ( ) . catch ( e => {
124
+ log . error ( 'error attempting to close server on too many inbound connections' , e )
125
+ } )
126
+ }
127
+
98
128
this . dispatchEvent ( new CustomEvent < Connection > ( 'connection' , { detail : conn } ) )
99
129
} )
100
130
. catch ( async err => {
@@ -148,21 +178,21 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
148
178
}
149
179
150
180
async listen ( ma : Multiaddr ) {
181
+ if ( this . status . started ) {
182
+ throw Error ( 'server is already listening' )
183
+ }
184
+
151
185
const peerId = ma . getPeerId ( )
152
186
const listeningAddr = peerId == null ? ma . decapsulateCode ( CODE_P2P ) : ma
153
187
154
- this . status = { started : true , listeningAddr, peerId }
188
+ this . status = {
189
+ started : true ,
190
+ listeningAddr,
191
+ peerId,
192
+ netConfig : multiaddrToNetConfig ( listeningAddr )
193
+ }
155
194
156
- return await new Promise < void > ( ( resolve , reject ) => {
157
- const options = multiaddrToNetConfig ( listeningAddr )
158
- this . server . listen ( options , ( err ?: any ) => {
159
- if ( err != null ) {
160
- return reject ( err )
161
- }
162
- log ( 'Listening on %s' , this . server . address ( ) )
163
- resolve ( )
164
- } )
165
- } )
195
+ await this . netListen ( )
166
196
}
167
197
168
198
async close ( ) {
@@ -174,8 +204,47 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
174
204
Array . from ( this . connections . values ( ) ) . map ( async maConn => await attemptClose ( maConn ) )
175
205
)
176
206
207
+ await this . netClose ( )
208
+ }
209
+
210
+ private async netListen ( ) : Promise < void > {
211
+ if ( ! this . status . started || this . server . listening ) {
212
+ return
213
+ }
214
+
215
+ const netConfig = this . status . netConfig
216
+
177
217
await new Promise < void > ( ( resolve , reject ) => {
178
- this . server . close ( err => ( err != null ) ? reject ( err ) : resolve ( ) )
218
+ // NOTE: 'listening' event is only fired on success. Any error such as port already binded, is emitted via 'error'
219
+ this . server . once ( 'error' , reject )
220
+ this . server . listen ( netConfig , resolve )
179
221
} )
222
+
223
+ log ( 'Listening on %s' , this . server . address ( ) )
224
+ }
225
+
226
+ private async netClose ( ) : Promise < void > {
227
+ if ( ! this . status . started || ! this . server . listening ) {
228
+ return
229
+ }
230
+
231
+ log ( 'Closing server on %s' , this . server . address ( ) )
232
+
233
+ // NodeJS implementation tracks listening status with `this._handle` property.
234
+ // - Server.close() sets this._handle to null immediately. If this._handle is null, ERR_SERVER_NOT_RUNNING is thrown
235
+ // - Server.listening returns `this._handle !== null` https://github.com/nodejs/node/blob/386d761943bb1b217fba27d6b80b658c23009e60/lib/net.js#L1675
236
+ // - Server.listen() if `this._handle !== null` throws ERR_SERVER_ALREADY_LISTEN
237
+ //
238
+ // NOTE: Both listen and close are technically not async actions, so it's not necessary to track
239
+ // states 'pending-close' or 'pending-listen'
240
+
241
+ // From docs https://nodejs.org/api/net.html#serverclosecallback
242
+ // Stops the server from accepting new connections and keeps existing connections.
243
+ // 'close' event is emitted only emitted when all connections are ended.
244
+ // The optional callback will be called once the 'close' event occurs.
245
+ //
246
+ // NOTE: Since we want to keep existing connections and have checked `!this.server.listening` it's not necessary
247
+ // to pass a callback to close.
248
+ this . server . close ( )
180
249
}
181
250
}
0 commit comments