@@ -4,12 +4,21 @@ const safeBuffer = require('safe-buffer');
4
4
const zlib = require ( 'zlib' ) ;
5
5
6
6
const bufferUtil = require ( './BufferUtil' ) ;
7
+ const Limiter = require ( 'async-limiter' ) ;
7
8
8
9
const Buffer = safeBuffer . Buffer ;
9
10
10
11
const TRAILER = Buffer . from ( [ 0x00 , 0x00 , 0xff , 0xff ] ) ;
11
12
const EMPTY_BLOCK = Buffer . from ( [ 0x00 ] ) ;
12
13
14
+ // We limit zlib concurrency, which prevents severe memory fragmentation
15
+ // as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913
16
+ // and https://github.com/websockets/ws/issues/1202
17
+ //
18
+ // Intentionally global; it's the global thread pool that's
19
+ // an issue.
20
+ let zlibLimiter ;
21
+
13
22
/**
14
23
* Per-message Deflate implementation.
15
24
*/
@@ -25,6 +34,13 @@ class PerMessageDeflate {
25
34
this . _inflate = null ;
26
35
27
36
this . params = null ;
37
+
38
+ if ( ! zlibLimiter ) {
39
+ const concurrency = this . _options . concurrencyLimit !== undefined
40
+ ? this . _options . concurrencyLimit
41
+ : 10 ;
42
+ zlibLimiter = new Limiter ( { concurrency } ) ;
43
+ }
28
44
}
29
45
30
46
static get extensionName ( ) {
@@ -249,14 +265,48 @@ class PerMessageDeflate {
249
265
}
250
266
251
267
/**
252
- * Decompress data.
268
+ * Decompress data. Concurrency limited by async-limiter.
253
269
*
254
270
* @param {Buffer } data Compressed data
255
271
* @param {Boolean } fin Specifies whether or not this is the last fragment
256
272
* @param {Function } callback Callback
257
273
* @public
258
274
*/
259
275
decompress ( data , fin , callback ) {
276
+ zlibLimiter . push ( ( done ) => {
277
+ this . _decompress ( data , fin , function ( err , result ) {
278
+ done ( ) ;
279
+ callback ( err , result ) ;
280
+ } ) ;
281
+ } ) ;
282
+ }
283
+
284
+ /**
285
+ * Compress data. Concurrency limited by async-limiter.
286
+ *
287
+ * @param {Buffer } data Data to compress
288
+ * @param {Boolean } fin Specifies whether or not this is the last fragment
289
+ * @param {Function } callback Callback
290
+ * @public
291
+ */
292
+ compress ( data , fin , callback ) {
293
+ zlibLimiter . push ( ( done ) => {
294
+ this . _compress ( data , fin , function ( err , result ) {
295
+ done ( ) ;
296
+ callback ( err , result ) ;
297
+ } ) ;
298
+ } ) ;
299
+ }
300
+
301
+ /**
302
+ * Decompress data.
303
+ *
304
+ * @param {Buffer } data Compressed data
305
+ * @param {Boolean } fin Specifies whether or not this is the last fragment
306
+ * @param {Function } callback Callback
307
+ * @private
308
+ */
309
+ _decompress ( data , fin , callback ) {
260
310
const endpoint = this . _isServer ? 'client' : 'server' ;
261
311
262
312
if ( ! this . _inflate ) {
@@ -322,9 +372,9 @@ class PerMessageDeflate {
322
372
* @param {Buffer } data Data to compress
323
373
* @param {Boolean } fin Specifies whether or not this is the last fragment
324
374
* @param {Function } callback Callback
325
- * @public
375
+ * @private
326
376
*/
327
- compress ( data , fin , callback ) {
377
+ _compress ( data , fin , callback ) {
328
378
if ( ! data || data . length === 0 ) {
329
379
process . nextTick ( callback , null , EMPTY_BLOCK ) ;
330
380
return ;
0 commit comments