Skip to content

Commit 4e2a49d

Browse files
authored
feat: add message byte batching (libp2p#235)
* feat: add message byte batching Adds a new setting `minSendBytes` that is `undefined` by default. If `undefined` all messages sent through multiplexed streams will be serialized and sent over the wire immediately. If set to a number, it will be used as a byte value, and the serialized bytes of all messages sent during the current tick will be buffered up to that value. Once either the buffer lengths hits that value or the next tick begins, all bytes in the buffer will be sent over the wire. * chore: add readme note
1 parent 618a917 commit 4e2a49d

File tree

6 files changed

+96
-7
lines changed

6 files changed

+96
-7
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ Creates a factory that can be used to create new muxers.
7171

7272
- `maxMsgSize` - a number that defines how large mplex data messages can be in bytes, if messages are larger than this they will be sent as multiple messages (default: 1048576 - e.g. 1MB)
7373
- `maxUnprocessedMessageQueueSize` - a number that limits the size of the unprocessed input buffer (default: 4194304 - e.g. 4MB)
74+
- `minSendBytes` - if set, message bytes from the current tick will be batched up to this amount before being yielded by the muxer source, unless the next tick begins in which case all available bytes will be yielded
7475
- `maxInboundStreams` - a number that defines how many incoming streams are allowed per connection (default: 1024)
7576
- `maxOutboundStreams` - a number that defines how many outgoing streams are allowed per connection (default: 1024)
7677
- `maxStreamBufferSize` - a number that defines how large the message buffer is allowed to grow (default: 1024 \* 1024 \* 4 - e.g. 4MB)

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@
153153
"any-signal": "^3.0.0",
154154
"benchmark": "^2.1.4",
155155
"err-code": "^3.0.1",
156+
"it-batched-bytes": "^1.0.0",
156157
"it-pushable": "^3.1.0",
157158
"it-stream-types": "^1.0.4",
158159
"rate-limiter-flexible": "^2.3.9",

src/encode.ts

+22-6
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import varint from 'varint'
33
import { Uint8ArrayList } from 'uint8arraylist'
44
import { allocUnsafe } from './alloc-unsafe.js'
55
import { Message, MessageTypes } from './message-types.js'
6+
import batchedBytes from 'it-batched-bytes'
67

78
const POOL_SIZE = 10 * 1024
89

@@ -55,14 +56,29 @@ const encoder = new Encoder()
5556
/**
5657
* Encode and yield one or more messages
5758
*/
58-
export async function * encode (source: Source<Message[]>) {
59-
for await (const msgs of source) {
60-
const list = new Uint8ArrayList()
59+
export async function * encode (source: Source<Message[]>, minSendBytes: number = 0) {
60+
if (minSendBytes == null || minSendBytes === 0) {
61+
// just send the messages
62+
for await (const messages of source) {
63+
const list = new Uint8ArrayList()
6164

62-
for (const msg of msgs) {
63-
encoder.write(msg, list)
65+
for (const msg of messages) {
66+
encoder.write(msg, list)
67+
}
68+
69+
yield list.subarray()
6470
}
6571

66-
yield list.subarray()
72+
return
6773
}
74+
75+
// batch messages up for sending
76+
yield * batchedBytes(source, {
77+
size: minSendBytes,
78+
serialize: (obj, list) => {
79+
for (const m of obj) {
80+
encoder.write(m, list)
81+
}
82+
}
83+
})
6884
}

src/index.ts

+12
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,18 @@ export interface MplexInit {
1919
*/
2020
maxUnprocessedMessageQueueSize?: number
2121

22+
/**
23+
* Each byte array written into a multiplexed stream is converted to one or
24+
* more messages which are sent as byte arrays to the remote node. Sending
25+
* lots of small messages can be expensive - use this setting to batch up
26+
* the serialized bytes of all messages sent during the current tick up to
27+
* this limit to send in one go similar to Nagle's algorithm. N.b. you
28+
* should benchmark your application carefully when using this setting as it
29+
* may cause the opposite of the desired effect. Omit this setting to send
30+
* all messages as they become available. (default: undefined)
31+
*/
32+
minSendBytes?: number
33+
2234
/**
2335
* The maximum number of multiplexed streams that can be open at any
2436
* one time. A request to open more than this will have a stream

src/mplex.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ export class MplexStreamMuxer implements StreamMuxer {
231231
onEnd
232232
})
233233

234-
return Object.assign(encode(source), {
234+
return Object.assign(encode(source, this._init.minSendBytes), {
235235
push: source.push,
236236
end: source.end,
237237
return: source.return

test/mplex.spec.ts

+59
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,17 @@ describe('mplex', () => {
6767
stream.end()
6868

6969
const bufs: Uint8Array[] = []
70+
const sinkDone = pDefer()
7071

7172
void Promise.resolve().then(async () => {
7273
for await (const buf of muxer.source) {
7374
bufs.push(buf)
7475
}
76+
sinkDone.resolve()
7577
})
7678

7779
await muxer.sink(stream)
80+
await sinkDone.promise
7881

7982
const messages = await all(decode()(bufs))
8083

@@ -162,4 +165,60 @@ describe('mplex', () => {
162165
expect(messages).to.have.nested.property('[0].id', id)
163166
expect(messages).to.have.nested.property('[0].type', MessageTypes.RESET_RECEIVER)
164167
})
168+
169+
it('should batch bytes to send', async () => {
170+
const minSendBytes = 10
171+
172+
// input bytes, smaller than batch size
173+
const input: Uint8Array[] = [
174+
Uint8Array.from([0, 1, 2, 3, 4]),
175+
Uint8Array.from([0, 1, 2, 3, 4]),
176+
Uint8Array.from([0, 1, 2, 3, 4])
177+
]
178+
179+
// create the muxer
180+
const factory = mplex({
181+
minSendBytes
182+
})()
183+
const muxer = factory.createStreamMuxer({})
184+
185+
// collect outgoing mplex messages
186+
const muxerFinished = pDefer()
187+
let output: Uint8Array[] = []
188+
void Promise.resolve().then(async () => {
189+
output = await all(muxer.source)
190+
muxerFinished.resolve()
191+
})
192+
193+
// create a stream
194+
const stream = await muxer.newStream()
195+
const streamFinished = pDefer()
196+
// send messages over the stream
197+
void Promise.resolve().then(async () => {
198+
await stream.sink(async function * () {
199+
yield * input
200+
}())
201+
stream.close()
202+
streamFinished.resolve()
203+
})
204+
205+
// wait for all data to be sent over the stream
206+
await streamFinished.promise
207+
208+
// close the muxer
209+
await muxer.sink([])
210+
211+
// wait for all output to be collected
212+
await muxerFinished.promise
213+
214+
// last message is unbatched
215+
const closeMessage = output.pop()
216+
expect(closeMessage).to.have.lengthOf(2)
217+
218+
// all other messages should be above or equal to the batch size
219+
expect(output).to.have.lengthOf(2)
220+
for (const buf of output) {
221+
expect(buf).to.have.length.that.is.at.least(minSendBytes)
222+
}
223+
})
165224
})

0 commit comments

Comments
 (0)