Skip to content

Commit d4b440b

Browse files
debadree25RafaelGSS
authored andcommitted
fs: implement byob mode for readableWebStream()
Fixes: #45853 PR-URL: #46933 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 174662a commit d4b440b

File tree

3 files changed

+154
-17
lines changed

3 files changed

+154
-17
lines changed

doc/api/fs.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,14 +446,22 @@ Reads data from the file and stores that in the given buffer.
446446
If the file is not modified concurrently, the end-of-file is reached when the
447447
number of bytes read is zero.
448448
449-
#### `filehandle.readableWebStream()`
449+
#### `filehandle.readableWebStream(options)`
450450
451451
<!-- YAML
452452
added: v17.0.0
453+
changes:
454+
- version: REPLACEME
455+
pr-url: https://github.com/nodejs/node/pull/46933
456+
description: Added option to create a 'bytes' stream.
453457
-->
454458
455459
> Stability: 1 - Experimental
456460
461+
* `options` {Object}
462+
* `type` {string|undefined} Whether to open a normal or a `'bytes'` stream.
463+
**Default:** `undefined`
464+
457465
* Returns: {ReadableStream}
458466
459467
Returns a `ReadableStream` that may be used to read the files data.

lib/internal/fs/promises.js

Lines changed: 61 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const {
1414
SafePromisePrototypeFinally,
1515
Symbol,
1616
Uint8Array,
17+
FunctionPrototypeBind,
1718
} = primordials;
1819

1920
const { fs: constants } = internalBinding('constants');
@@ -249,29 +250,73 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
249250
* } ReadableStream
250251
* @returns {ReadableStream}
251252
*/
252-
readableWebStream() {
253+
readableWebStream(options = kEmptyObject) {
253254
if (this[kFd] === -1)
254255
throw new ERR_INVALID_STATE('The FileHandle is closed');
255256
if (this[kClosePromise])
256257
throw new ERR_INVALID_STATE('The FileHandle is closing');
257258
if (this[kLocked])
258259
throw new ERR_INVALID_STATE('The FileHandle is locked');
259260
this[kLocked] = true;
260-
const {
261-
newReadableStreamFromStreamBase,
262-
} = require('internal/webstreams/adapters');
263-
const readable = newReadableStreamFromStreamBase(
264-
this[kHandle],
265-
undefined,
266-
{ ondone: () => this[kUnref]() });
267-
268-
const {
269-
readableStreamCancel,
270-
} = require('internal/webstreams/readablestream');
271-
this[kRef]();
272-
this.once('close', () => {
273-
readableStreamCancel(readable);
274-
});
261+
262+
if (options.type !== undefined) {
263+
validateString(options.type, 'options.type');
264+
}
265+
266+
let readable;
267+
268+
if (options.type !== 'bytes') {
269+
const {
270+
newReadableStreamFromStreamBase,
271+
} = require('internal/webstreams/adapters');
272+
readable = newReadableStreamFromStreamBase(
273+
this[kHandle],
274+
undefined,
275+
{ ondone: () => this[kUnref]() });
276+
277+
const {
278+
readableStreamCancel,
279+
} = require('internal/webstreams/readablestream');
280+
this[kRef]();
281+
this.once('close', () => {
282+
readableStreamCancel(readable);
283+
});
284+
} else {
285+
const {
286+
readableStreamCancel,
287+
ReadableStream,
288+
} = require('internal/webstreams/readablestream');
289+
290+
const readFn = FunctionPrototypeBind(this.read, this);
291+
const ondone = FunctionPrototypeBind(this[kUnref], this);
292+
293+
readable = new ReadableStream({
294+
type: 'bytes',
295+
autoAllocateChunkSize: 16384,
296+
297+
async pull(controller) {
298+
const view = controller.byobRequest.view;
299+
const { bytesRead } = await readFn(view, view.byteOffset, view.byteLength);
300+
301+
if (bytesRead === 0) {
302+
ondone();
303+
controller.close();
304+
}
305+
306+
controller.byobRequest.respond(bytesRead);
307+
},
308+
309+
cancel() {
310+
ondone();
311+
},
312+
});
313+
314+
this[kRef]();
315+
316+
this.once('close', () => {
317+
readableStreamCancel(readable);
318+
});
319+
}
275320

276321
return readable;
277322
}

test/parallel/test-filehandle-readablestream.js

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,87 @@ const check = readFileSync(__filename, { encoding: 'utf8' });
8686
mc.port1.close();
8787
await file.close();
8888
})().then(common.mustCall());
89+
90+
// Make sure 'bytes' stream works
91+
(async () => {
92+
const file = await open(__filename);
93+
const dec = new TextDecoder();
94+
const readable = file.readableWebStream({ type: 'bytes' });
95+
const reader = readable.getReader({ mode: 'byob' });
96+
97+
let data = '';
98+
let result;
99+
do {
100+
const buff = new ArrayBuffer(100);
101+
result = await reader.read(new DataView(buff));
102+
if (result.value !== undefined) {
103+
data += dec.decode(result.value);
104+
assert.ok(result.value.byteLength <= 100);
105+
}
106+
} while (!result.done);
107+
108+
assert.strictEqual(check, data);
109+
110+
assert.throws(() => file.readableWebStream(), {
111+
code: 'ERR_INVALID_STATE',
112+
});
113+
114+
await file.close();
115+
})().then(common.mustCall());
116+
117+
// Make sure that acquiring a ReadableStream 'bytes' stream
118+
// fails if the FileHandle is already closed.
119+
(async () => {
120+
const file = await open(__filename);
121+
await file.close();
122+
123+
assert.throws(() => file.readableWebStream({ type: 'bytes' }), {
124+
code: 'ERR_INVALID_STATE',
125+
});
126+
})().then(common.mustCall());
127+
128+
// Make sure that acquiring a ReadableStream 'bytes' stream
129+
// fails if the FileHandle is already closing.
130+
(async () => {
131+
const file = await open(__filename);
132+
file.close();
133+
134+
assert.throws(() => file.readableWebStream({ type: 'bytes' }), {
135+
code: 'ERR_INVALID_STATE',
136+
});
137+
})().then(common.mustCall());
138+
139+
// Make sure the 'bytes' ReadableStream is closed when the underlying
140+
// FileHandle is closed.
141+
(async () => {
142+
const file = await open(__filename);
143+
const readable = file.readableWebStream({ type: 'bytes' });
144+
const reader = readable.getReader({ mode: 'byob' });
145+
file.close();
146+
await reader.closed;
147+
})().then(common.mustCall());
148+
149+
// Make sure the 'bytes' ReadableStream is closed when the underlying
150+
// FileHandle is closed.
151+
(async () => {
152+
const file = await open(__filename);
153+
const readable = file.readableWebStream({ type: 'bytes' });
154+
file.close();
155+
const reader = readable.getReader({ mode: 'byob' });
156+
await reader.closed;
157+
})().then(common.mustCall());
158+
159+
// Make sure that the FileHandle is properly marked "in use"
160+
// when a 'bytes' ReadableStream has been acquired for it.
161+
(async () => {
162+
const file = await open(__filename);
163+
file.readableWebStream({ type: 'bytes' });
164+
const mc = new MessageChannel();
165+
mc.port1.onmessage = common.mustNotCall();
166+
assert.throws(() => mc.port2.postMessage(file, [file]), {
167+
code: 25,
168+
name: 'DataCloneError',
169+
});
170+
mc.port1.close();
171+
await file.close();
172+
})().then(common.mustCall());

0 commit comments

Comments
 (0)