-
-
Notifications
You must be signed in to change notification settings - Fork 31.7k
stream: abort signal in readable streams #36061
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,8 +45,8 @@ There are four fundamental stream types within Node.js: | |
is written and read (for example, [`zlib.createDeflate()`][]). | ||
|
||
Additionally, this module includes the utility functions | ||
[`stream.pipeline()`][], [`stream.finished()`][] and | ||
[`stream.Readable.from()`][]. | ||
[`stream.pipeline()`][], [`stream.finished()`][], [`stream.Readable.from()`][] | ||
and [`stream.addAbortSignal()`][]. | ||
|
||
### Streams Promises API | ||
<!-- YAML | ||
|
@@ -1799,6 +1799,55 @@ Calling `Readable.from(string)` or `Readable.from(buffer)` will not have | |
the strings or buffers be iterated to match the other streams semantics | ||
for performance reasons. | ||
|
||
### `stream.addAbortSignal(signal, stream)` | ||
<!-- YAML | ||
added: REPLACEME | ||
--> | ||
* `signal` {AbortSignal} A signal representing possible cancellation | ||
* `stream` {Stream} a stream to attach a signal to | ||
|
||
Attaches an AbortSignal to a readable or writeable stream. This lets code | ||
control stream destruction using an `AbortController`. | ||
|
||
Calling `abort` on the `AbortController` corresponding to the passed | ||
`AbortSignal` will behave the same way as calling `.destroy(new AbortError())` | ||
on the stream. | ||
|
||
```js | ||
const fs = require('fs'); | ||
|
||
const controller = new AbortController(); | ||
const read = addAbortSignal( | ||
controller.signal, | ||
fs.createReadStream(('object.json')) | ||
); | ||
// Later, abort the operation closing the stream | ||
controller.abort(); | ||
``` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An example that shows this used with an async iterator would be good. |
||
|
||
Or using an `AbortSignal` with a readable stream as an async iterable: | ||
|
||
```js | ||
const controller = new AbortController(); | ||
setTimeout(() => controller.abort(), 10_000); // set a timeout | ||
const stream = addAbortSignal( | ||
controller.signal, | ||
fs.createReadStream(('object.json')) | ||
); | ||
(async () => { | ||
try { | ||
for await (const chunk of stream) { | ||
await process(chunk); | ||
} | ||
} catch (e) { | ||
if (e.name === 'AbortError') { | ||
// The operation was cancelled | ||
} else { | ||
throw e; | ||
} | ||
} | ||
})(); | ||
``` | ||
## API for stream implementers | ||
|
||
<!--type=misc--> | ||
|
@@ -3123,6 +3172,7 @@ contain multi-byte characters. | |
[`stream.finished()`]: #stream_stream_finished_stream_options_callback | ||
[`stream.pipe()`]: #stream_readable_pipe_destination_options | ||
[`stream.pipeline()`]: #stream_stream_pipeline_source_transforms_destination_callback | ||
[`stream.addAbortSignal()`]: #stream_stream_addabortsignal_signal_stream | ||
[`stream.uncork()`]: #stream_writable_uncork | ||
[`stream.unpipe()`]: #stream_readable_unpipe_destination | ||
[`stream.wrap()`]: #stream_readable_wrap_stream | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
'use strict'; | ||
|
||
const { | ||
AbortError, | ||
codes, | ||
} = require('internal/errors'); | ||
|
||
const eos = require('internal/streams/end-of-stream'); | ||
const { ERR_INVALID_ARG_TYPE } = codes; | ||
|
||
// This method is inlined here for readable-stream | ||
// https://github.com/nodejs/node/pull/36061#discussion_r533718029 | ||
const validateAbortSignal = (signal, name) => { | ||
if (signal !== undefined && | ||
(signal === null || | ||
typeof signal !== 'object' || | ||
!('aborted' in signal))) { | ||
throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal); | ||
} | ||
}; | ||
|
||
function isStream(obj) { | ||
return !!(obj && typeof obj.pipe === 'function'); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this works as you think it would. I think this passes for every descendant of Stream. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think so too and so do the tests - but I just copied |
||
|
||
module.exports = function addAbortSignal(signal, stream) { | ||
validateAbortSignal(signal, 'signal'); | ||
if (!isStream(stream)) { | ||
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream); | ||
} | ||
benjamingr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
const onAbort = () => { | ||
stream.destroy(new AbortError()); | ||
}; | ||
if (signal.aborted) { | ||
onAbort(); | ||
ronag marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} else { | ||
signal.addEventListener('abort', onAbort); | ||
eos(stream, () => signal.removeEventListener('abort', onAbort)); | ||
} | ||
return stream; | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
'use strict'; | ||
|
||
const common = require('../common'); | ||
const { Readable } = require('stream'); | ||
const { Readable, addAbortSignal } = require('stream'); | ||
const assert = require('assert'); | ||
|
||
{ | ||
|
@@ -268,3 +268,38 @@ const assert = require('assert'); | |
})); | ||
read.resume(); | ||
} | ||
|
||
{ | ||
const controller = new AbortController(); | ||
const read = addAbortSignal(controller.signal, new Readable({ | ||
read() { | ||
this.push('asd'); | ||
}, | ||
})); | ||
|
||
read.on('error', common.mustCall((e) => { | ||
assert.strictEqual(e.name, 'AbortError'); | ||
})); | ||
controller.abort(); | ||
read.on('data', common.mustNotCall()); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add tests for writable as well. |
||
|
||
{ | ||
const controller = new AbortController(); | ||
const read = addAbortSignal(controller.signal, new Readable({ | ||
objectMode: true, | ||
read() { | ||
return false; | ||
} | ||
})); | ||
read.push('asd'); | ||
|
||
read.on('error', common.mustCall((e) => { | ||
assert.strictEqual(e.name, 'AbortError'); | ||
})); | ||
assert.rejects((async () => { | ||
/* eslint-disable-next-line no-unused-vars */ | ||
for await (const chunk of read) {} | ||
})(), /AbortError/); | ||
setTimeout(() => controller.abort(), 0); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A test that sets up a pipeline that uses AbortController would be good, as would a test using a single AbortController for two different Readables at the same time. |
Uh oh!
There was an error while loading. Please reload this page.