Skip to content

Commit a51d085

Browse files
authored
Allow multiple readers at once (#121)
1 parent 9749f68 commit a51d085

File tree

5 files changed

+391
-6
lines changed

5 files changed

+391
-6
lines changed

package.json

+4
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,13 @@
4242
"object",
4343
"concat"
4444
],
45+
"dependencies": {
46+
"is-stream": "^4.0.1"
47+
},
4548
"devDependencies": {
4649
"@types/node": "^20.8.9",
4750
"ava": "^5.3.1",
51+
"onetime": "^7.0.0",
4852
"precise-now": "^3.0.0",
4953
"stream-json": "^1.8.0",
5054
"tsd": "^0.29.0",

source/contents.js

+4-6
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1+
import {getAsyncIterable} from './stream.js';
2+
13
export const getStreamContents = async (stream, {init, convertChunk, getSize, truncateChunk, addChunk, getFinalChunk, finalize}, {maxBuffer = Number.POSITIVE_INFINITY} = {}) => {
2-
if (!isAsyncIterable(stream)) {
3-
throw new Error('The first argument must be a Readable, a ReadableStream, or an async iterable.');
4-
}
4+
const asyncIterable = getAsyncIterable(stream);
55

66
const state = init();
77
state.length = 0;
88

99
try {
10-
for await (const chunk of stream) {
10+
for await (const chunk of asyncIterable) {
1111
const chunkType = getChunkType(chunk);
1212
const convertedChunk = convertChunk[chunkType](chunk, state);
1313
appendChunk({convertedChunk, state, getSize, truncateChunk, addChunk, maxBuffer});
@@ -52,8 +52,6 @@ const addNewChunk = (convertedChunk, state, addChunk, newLength) => {
5252
state.length = newLength;
5353
};
5454

55-
const isAsyncIterable = stream => typeof stream === 'object' && stream !== null && typeof stream[Symbol.asyncIterator] === 'function';
56-
5755
const getChunkType = chunk => {
5856
const typeOfChunk = typeof chunk;
5957

source/stream.js

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import {isReadableStream} from 'is-stream';
2+
3+
export const getAsyncIterable = stream => {
4+
if (isReadableStream(stream, {checkOpen: false})) {
5+
return getStreamIterable(stream);
6+
}
7+
8+
if (typeof stream?.[Symbol.asyncIterator] !== 'function') {
9+
throw new TypeError('The first argument must be a Readable, a ReadableStream, or an async iterable.');
10+
}
11+
12+
return stream;
13+
};
14+
15+
// The default iterable for Node.js streams does not allow for multiple readers at once, so we re-implement it
16+
const getStreamIterable = async function * (stream) {
17+
if (nodeImports === undefined) {
18+
await loadNodeImports();
19+
}
20+
21+
const controller = new AbortController();
22+
const state = {};
23+
handleStreamEnd(stream, controller, state);
24+
25+
try {
26+
for await (const [chunk] of nodeImports.events.on(stream, 'data', {
27+
signal: controller.signal,
28+
highWatermark: stream.readableHighWaterMark,
29+
})) {
30+
yield chunk;
31+
}
32+
} catch (error) {
33+
// Stream failure, for example due to `stream.destroy(error)`
34+
if (state.error !== undefined) {
35+
throw state.error;
36+
// `error` event directly emitted on stream
37+
} else if (!controller.signal.aborted) {
38+
throw error;
39+
// Otherwise, stream completed successfully
40+
}
41+
// The `finally` block also runs when the caller throws, for example due to the `maxBuffer` option
42+
} finally {
43+
stream.destroy();
44+
}
45+
};
46+
47+
const handleStreamEnd = async (stream, controller, state) => {
48+
try {
49+
await nodeImports.streamPromises.finished(stream, {cleanup: true, readable: true, writable: false, error: false});
50+
} catch (error) {
51+
state.error = error;
52+
} finally {
53+
controller.abort();
54+
}
55+
};
56+
57+
// Use dynamic imports to support browsers
58+
const loadNodeImports = async () => {
59+
const [events, streamPromises] = await Promise.all([
60+
import('node:events'),
61+
import('node:stream/promises'),
62+
]);
63+
nodeImports = {events, streamPromises};
64+
};
65+
66+
let nodeImports;

test/fixtures/index.js

+2
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,5 @@ export const fixtureMultibyteString = '\u1000';
3131
export const longMultibyteString = `${fixtureMultibyteString}\u1000`;
3232

3333
export const bigArray = Array.from({length: 1e5}, () => Math.floor(Math.random() * (2 ** 8)));
34+
35+
export const prematureClose = {code: 'ERR_STREAM_PREMATURE_CLOSE'};

0 commit comments

Comments
 (0)