Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit e2a32ad

Browse files
fix: reduce async iterator loops per package in _createSink (#224)
Javascript abstractions are not free. While using pipe here looks nice, it adds a non-neglible cost allocating Promises for each extra `for await ()` iteration. - Similar rationale to libp2p/js-libp2p#1420 (comment) This PR merges sink pipe components in a single iteration Co-authored-by: achingbrain <[email protected]>
1 parent 9d4dd87 commit e2a32ad

File tree

7 files changed

+33
-35
lines changed

7 files changed

+33
-35
lines changed

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,6 @@
153153
"any-signal": "^3.0.0",
154154
"benchmark": "^2.1.4",
155155
"err-code": "^3.0.1",
156-
"it-pipe": "^2.0.3",
157156
"it-pushable": "^3.1.0",
158157
"it-stream-types": "^1.0.4",
159158
"rate-limiter-flexible": "^2.3.9",
@@ -172,6 +171,7 @@
172171
"it-drain": "^2.0.0",
173172
"it-foreach": "^1.0.0",
174173
"it-map": "^2.0.0",
174+
"it-pipe": "^2.0.3",
175175
"it-to-buffer": "^3.0.0",
176176
"p-defer": "^4.0.0",
177177
"random-int": "^3.0.0",

src/decode.ts

+1-19
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { MessageTypeNames, MessageTypes } from './message-types.js'
22
import { Uint8ArrayList } from 'uint8arraylist'
3-
import type { Source } from 'it-stream-types'
43
import type { Message } from './message-types.js'
54

65
export const MAX_MSG_SIZE = 1 << 20 // 1MB
@@ -13,7 +12,7 @@ interface MessageHeader {
1312
length: number
1413
}
1514

16-
class Decoder {
15+
export class Decoder {
1716
private readonly _buffer: Uint8ArrayList
1817
private _headerInfo: MessageHeader | null
1918
private readonly _maxMessageSize: number
@@ -136,20 +135,3 @@ function readVarInt (buf: Uint8ArrayList, offset: number = 0) {
136135
offset
137136
}
138137
}
139-
140-
/**
141-
* Decode a chunk and yield an _array_ of decoded messages
142-
*/
143-
export function decode (maxMessageSize: number = MAX_MSG_SIZE, maxUnprocessedMessageQueueSize: number = MAX_MSG_QUEUE_SIZE) {
144-
return async function * decodeMessages (source: Source<Uint8Array>): Source<Message> {
145-
const decoder = new Decoder(maxMessageSize, maxUnprocessedMessageQueueSize)
146-
147-
for await (const chunk of source) {
148-
const msgs = decoder.write(chunk)
149-
150-
if (msgs.length > 0) {
151-
yield * msgs
152-
}
153-
}
154-
}
155-
}

src/mplex.ts

+7-10
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
import { pipe } from 'it-pipe'
21
import { pushableV } from 'it-pushable'
32
import { abortableSource } from 'abortable-iterator'
43
import { encode } from './encode.js'
5-
import { decode } from './decode.js'
4+
import { Decoder } from './decode.js'
65
import { MessageTypes, MessageTypeNames, Message } from './message-types.js'
76
import { createStream } from './stream.js'
87
import { toString as uint8ArrayToString } from 'uint8arrays'
@@ -201,15 +200,13 @@ export class MplexStreamMuxer implements StreamMuxer {
201200
source = abortableSource(source, anySignal(abortSignals))
202201

203202
try {
204-
await pipe(
205-
source,
206-
decode(this._init.maxMsgSize, this._init.maxUnprocessedMessageQueueSize),
207-
async source => {
208-
for await (const msg of source) {
209-
await this._handleIncoming(msg)
210-
}
203+
const decoder = new Decoder(this._init.maxMsgSize, this._init.maxUnprocessedMessageQueueSize)
204+
205+
for await (const chunk of source) {
206+
for (const msg of decoder.write(chunk)) {
207+
await this._handleIncoming(msg)
211208
}
212-
)
209+
}
213210

214211
this._source.end()
215212
} catch (err: any) {

test/coder.spec.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import { expect } from 'aegir/chai'
55
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
66
import { encode } from '../src/encode.js'
7-
import { decode } from '../src/decode.js'
7+
import { decode } from './fixtures/decode.js'
88
import all from 'it-all'
99
import { concat as uint8ArrayConcat } from 'uint8arrays/concat'
1010
import { messageWithBytes } from './fixtures/utils.js'

test/fixtures/decode.ts

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/* eslint-env mocha */
2+
3+
import type { Message } from '../../src/message-types.js'
4+
import { Decoder, MAX_MSG_QUEUE_SIZE, MAX_MSG_SIZE } from '../../src/decode.js'
5+
import type { Source } from 'it-stream-types'
6+
7+
export function decode (maxMessageSize: number = MAX_MSG_SIZE, maxUnprocessedMessageQueueSize: number = MAX_MSG_QUEUE_SIZE) {
8+
return async function * decodeMessages (source: Source<Uint8Array>): Source<Message> {
9+
const decoder = new Decoder(maxMessageSize, maxUnprocessedMessageQueueSize)
10+
11+
for await (const chunk of source) {
12+
const msgs = decoder.write(chunk)
13+
14+
if (msgs.length > 0) {
15+
yield * msgs
16+
}
17+
}
18+
}
19+
}

test/mplex.spec.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import all from 'it-all'
1111
import type { Source } from 'it-stream-types'
1212
import delay from 'delay'
1313
import pDefer from 'p-defer'
14-
import { decode } from '../src/decode.js'
14+
import { decode } from './fixtures/decode.js'
1515
import { pushable } from 'it-pushable'
1616
import { Uint8ArrayList } from 'uint8arraylist'
1717

@@ -135,8 +135,8 @@ describe('mplex', () => {
135135
streamSourceError.reject(new Error('Stream source did not error'))
136136
})
137137
.catch(err => {
138-
// should have errored before all messages were sent
139-
expect(sent).to.equal(2)
138+
// should have errored before all 102 messages were sent
139+
expect(sent).to.be.lessThan(10)
140140
streamSourceError.resolve(err)
141141
})
142142
}

test/restrict-size.spec.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import drain from 'it-drain'
88
import each from 'it-foreach'
99
import { Message, MessageTypes } from '../src/message-types.js'
1010
import { encode } from '../src/encode.js'
11-
import { decode } from '../src/decode.js'
11+
import { decode } from './fixtures/decode.js'
1212
import { Uint8ArrayList } from 'uint8arraylist'
1313
import toBuffer from 'it-to-buffer'
1414

0 commit comments

Comments
 (0)