Skip to content

Commit 6156f8a

Browse files
jakecastellinodejs-github-bot
authored andcommitted
Revert "stream: handle generator destruction from Duplex.from()"
This reverts commit 5541300. PR-URL: #56278 Reviewed-By: Matteo Collina <[email protected]>
1 parent a9e65f6 commit 6156f8a

File tree

2 files changed

+7
-243
lines changed

2 files changed

+7
-243
lines changed

lib/internal/streams/duplexify.js

Lines changed: 7 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -83,19 +83,15 @@ module.exports = function duplexify(body, name) {
8383
}
8484

8585
if (typeof body === 'function') {
86-
let d;
87-
88-
const { value, write, final, destroy } = fromAsyncGen(body, () => {
89-
destroyer(d);
90-
});
86+
const { value, write, final, destroy } = fromAsyncGen(body);
9187

9288
// Body might be a constructor function instead of an async generator function.
9389
if (isDuplexNodeStream(value)) {
94-
return d = value;
90+
return value;
9591
}
9692

9793
if (isIterable(value)) {
98-
return d = from(Duplexify, value, {
94+
return from(Duplexify, value, {
9995
// TODO (ronag): highWaterMark?
10096
objectMode: true,
10197
write,
@@ -106,16 +102,12 @@ module.exports = function duplexify(body, name) {
106102

107103
const then = value?.then;
108104
if (typeof then === 'function') {
109-
let finalized = false;
105+
let d;
110106

111107
const promise = FunctionPrototypeCall(
112108
then,
113109
value,
114110
(val) => {
115-
// The function returned without (fully) consuming the generator.
116-
if (!finalized) {
117-
destroyer(d);
118-
}
119111
if (val != null) {
120112
throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val);
121113
}
@@ -131,7 +123,6 @@ module.exports = function duplexify(body, name) {
131123
readable: false,
132124
write,
133125
final(cb) {
134-
finalized = true;
135126
final(async () => {
136127
try {
137128
await promise;
@@ -217,12 +208,11 @@ module.exports = function duplexify(body, name) {
217208
body);
218209
};
219210

220-
function fromAsyncGen(fn, destructor) {
211+
function fromAsyncGen(fn) {
221212
let { promise, resolve } = PromiseWithResolvers();
222213
const ac = new AbortController();
223214
const signal = ac.signal;
224-
225-
const asyncGenerator = (async function* () {
215+
const value = fn(async function*() {
226216
while (true) {
227217
const _promise = promise;
228218
promise = null;
@@ -232,44 +222,9 @@ function fromAsyncGen(fn, destructor) {
232222
if (signal.aborted)
233223
throw new AbortError(undefined, { cause: signal.reason });
234224
({ promise, resolve } = PromiseWithResolvers());
235-
// Next line will "break" the loop if the generator is returned/thrown.
236225
yield chunk;
237226
}
238-
})();
239-
240-
const originalReturn = asyncGenerator.return;
241-
asyncGenerator.return = async function(value) {
242-
try {
243-
return await originalReturn.call(this, value);
244-
} finally {
245-
if (promise) {
246-
const _promise = promise;
247-
promise = null;
248-
const { cb } = await _promise;
249-
process.nextTick(cb);
250-
251-
process.nextTick(destructor);
252-
}
253-
}
254-
};
255-
256-
const originalThrow = asyncGenerator.throw;
257-
asyncGenerator.throw = async function(err) {
258-
try {
259-
return await originalThrow.call(this, err);
260-
} finally {
261-
if (promise) {
262-
const _promise = promise;
263-
promise = null;
264-
const { cb } = await _promise;
265-
266-
// asyncGenerator.throw(undefined) should cause a callback error
267-
process.nextTick(cb, err ?? new AbortError());
268-
}
269-
}
270-
};
271-
272-
const value = fn(asyncGenerator, { signal });
227+
}(), { signal });
273228

274229
return {
275230
value,

test/parallel/test-stream-duplex-from.js

Lines changed: 0 additions & 191 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ const assert = require('assert');
55
const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream');
66
const { ReadableStream, WritableStream } = require('stream/web');
77
const { Blob } = require('buffer');
8-
const sleep = require('util').promisify(setTimeout);
98

109
{
1110
const d = Duplex.from({
@@ -402,193 +401,3 @@ function makeATestWritableStream(writeFunc) {
402401
assert.strictEqual(d.writable, false);
403402
}));
404403
}
405-
406-
{
407-
const r = Readable.from(['foo', 'bar', 'baz']);
408-
pipeline(
409-
r,
410-
Duplex.from(async function(asyncGenerator) {
411-
const values = await Array.fromAsync(asyncGenerator);
412-
assert.deepStrictEqual(values, ['foo', 'bar', 'baz']);
413-
414-
await asyncGenerator.return();
415-
await asyncGenerator.return();
416-
await asyncGenerator.return();
417-
}),
418-
common.mustSucceed(() => {
419-
assert.strictEqual(r.destroyed, true);
420-
})
421-
);
422-
}
423-
424-
{
425-
const r = Readable.from(['foo', 'bar', 'baz']);
426-
pipeline(
427-
r,
428-
Duplex.from(async function(asyncGenerator) {
429-
// eslint-disable-next-line no-unused-vars
430-
for await (const _ of asyncGenerator) break;
431-
}),
432-
common.mustSucceed(() => {
433-
assert.strictEqual(r.destroyed, true);
434-
})
435-
);
436-
}
437-
438-
{
439-
const r = Readable.from(['foo', 'bar', 'baz']);
440-
pipeline(
441-
r,
442-
Duplex.from(async function(asyncGenerator) {
443-
const a = await asyncGenerator.next();
444-
assert.strictEqual(a.done, false);
445-
assert.strictEqual(a.value.toString(), 'foo');
446-
const b = await asyncGenerator.return();
447-
assert.strictEqual(b.done, true);
448-
}),
449-
common.mustSucceed(() => {
450-
assert.strictEqual(r.destroyed, true);
451-
})
452-
);
453-
}
454-
455-
{
456-
const r = Readable.from(['foo', 'bar', 'baz']);
457-
pipeline(
458-
r,
459-
Duplex.from(async function(asyncGenerator) {
460-
// Note: the generator is not even started at this point
461-
await asyncGenerator.return();
462-
}),
463-
common.mustSucceed(() => {
464-
assert.strictEqual(r.destroyed, true);
465-
})
466-
);
467-
}
468-
469-
{
470-
const r = Readable.from(['foo', 'bar', 'baz']);
471-
pipeline(
472-
r,
473-
Duplex.from(async function(asyncGenerator) {
474-
// Same as before, with a delay
475-
await sleep(100);
476-
await asyncGenerator.return();
477-
}),
478-
common.mustSucceed(() => {
479-
assert.strictEqual(r.destroyed, true);
480-
})
481-
);
482-
}
483-
484-
{
485-
const r = Readable.from(['foo', 'bar', 'baz']);
486-
pipeline(
487-
r,
488-
Duplex.from(async function(asyncGenerator) {}),
489-
common.mustCall((err) => {
490-
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
491-
assert.strictEqual(r.destroyed, true);
492-
})
493-
);
494-
}
495-
496-
{
497-
const r = Readable.from(['foo', 'bar', 'baz']);
498-
pipeline(
499-
r,
500-
Duplex.from(async function(asyncGenerator) {
501-
await sleep(100);
502-
}),
503-
common.mustCall((err) => {
504-
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
505-
assert.strictEqual(r.destroyed, true);
506-
})
507-
);
508-
}
509-
510-
{
511-
const r = Readable.from(['foo', 'bar', 'baz']);
512-
const d = Duplex.from(async function(asyncGenerator) {
513-
while (!(await asyncGenerator.next()).done) await sleep(100);
514-
});
515-
516-
setTimeout(() => d.destroy(), 150);
517-
518-
pipeline(
519-
r,
520-
d,
521-
common.mustCall((err) => {
522-
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
523-
assert.strictEqual(r.destroyed, true);
524-
})
525-
);
526-
}
527-
528-
{
529-
const r = Duplex.from(async function* () {
530-
for (const value of ['foo', 'bar', 'baz']) {
531-
await sleep(50);
532-
yield value;
533-
}
534-
});
535-
const d = Duplex.from(async function(asyncGenerator) {
536-
while (!(await asyncGenerator.next()).done);
537-
});
538-
539-
setTimeout(() => r.destroy(), 75);
540-
541-
pipeline(
542-
r,
543-
d,
544-
common.mustCall((err) => {
545-
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
546-
assert.strictEqual(r.destroyed, true);
547-
assert.strictEqual(d.destroyed, true);
548-
})
549-
);
550-
}
551-
552-
{
553-
const r = Readable.from(['foo']);
554-
pipeline(
555-
r,
556-
Duplex.from(async function(asyncGenerator) {
557-
await asyncGenerator.throw(new Error('my error'));
558-
}),
559-
common.mustCall((err) => {
560-
assert.strictEqual(err.message, 'my error');
561-
assert.strictEqual(r.destroyed, true);
562-
})
563-
);
564-
}
565-
566-
{
567-
const r = Readable.from(['foo', 'bar']);
568-
pipeline(
569-
r,
570-
Duplex.from(async function(asyncGenerator) {
571-
await asyncGenerator.next();
572-
await asyncGenerator.throw(new Error('my error'));
573-
}),
574-
common.mustCall((err) => {
575-
assert.strictEqual(err.message, 'my error');
576-
assert.strictEqual(r.destroyed, true);
577-
})
578-
);
579-
}
580-
581-
{
582-
const r = Readable.from(['foo', 'bar']);
583-
pipeline(
584-
r,
585-
Duplex.from(async function(asyncGenerator) {
586-
await asyncGenerator.next();
587-
await asyncGenerator.throw();
588-
}),
589-
common.mustCall((err) => {
590-
assert.strictEqual(err.code, 'ABORT_ERR');
591-
assert.strictEqual(r.destroyed, true);
592-
})
593-
);
594-
}

0 commit comments

Comments
 (0)