Skip to content

Commit 08dfa68

Browse files
committed
Add IsReadableStreamDisturbed predicate
See the suggestion in the OP of #378.
1 parent bc45171 commit 08dfa68

File tree

4 files changed

+57
-0
lines changed

4 files changed

+57
-0
lines changed

index.bs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,7 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
387387
1. Set *this*@[[state]] to "readable".
388388
1. Set *this*@[[started]], *this*@[[closeRequested]], *this*@[[pullAgain]], and *this*@[[pulling]] to *false*.
389389
1. Set *this*@[[reader]] and *this*@[[storedError]] to *undefined*.
390+
1. Set *this*@[[disturbed]] to *false*.
390391
1. Set *this*@[[controller]] to Construct(`ReadableStreamController`, «*this*»).
391392
1. Let _normalizedStrategy_ be ValidateAndNormalizeQueuingStrategy(_size_, _highWaterMark_).
392393
1. Set *this*@[[strategySize]] to _normalizedStrategy_.[[size]] and *this*@[[strategyHWM]] to _normalizedStrategy_.[[highWaterMark]].
@@ -855,6 +856,7 @@ reader</a> for a given stream.
855856
<emu-alg>
856857
1. If _stream_@[[state]] is "closed", return a new promise resolved with *undefined*.
857858
1. If _stream_@[[state]] is "errored", return a new promise rejected with _stream_@[[storedError]].
859+
1. Set _stream_@[[disturbed]] to *true*.
858860
1. Set _stream_@[[queue]] to a new empty List.
859861
1. Perform FinishClosingReadableStream(_stream_).
860862
1. Let _sourceCancelPromise_ be PromiseInvokeOrNoop(_stream_@[[underlyingSource]], "cancel", «‍_reason_»).
@@ -971,6 +973,16 @@ an assert).
971973
1. Return *true*.
972974
</emu-alg>
973975

976+
<h4 id="is-readable-stream-disturbed" aoid="IsReadableStreamDisturbed">IsReadableStreamDisturbed ( stream )</h4>
977+
978+
This abstract operation is meant to be called from other specifications that may wish to query whether or not a
979+
readable stream is ever attemped to be read or cancelled.
980+
981+
<emu-alg>
982+
1. Assert: IsReadableStream(_stream_) is *true*.
983+
1. Return _stream_@[[disturbed]].
984+
</emu-alg>
985+
974986
<h4 id="is-readable-stream-locked" aoid="IsReadableStreamLocked">IsReadableStreamLocked ( stream )</h4>
975987

976988
This abstract operation is meant to be called from other specifications that may wish to query whether or not a
@@ -997,6 +1009,7 @@ readable stream is <a>locked to a reader</a>.
9971009
1. If _reader_@[[state]] is "errored", return a new promise rejected with _reader_@[[storedError]].
9981010
1. Assert: _reader_@[[ownerReadableStream]] is not *undefined*.
9991011
1. Assert: _reader_@[[ownerReadableStream]]@[[state]] is "readable".
1012+
1. Set _reader_@[[ownerReadableStream]]@[[disturbed]] to *true*.
10001013
1. If _reader_@[[ownerReadableStream]]@[[queue]] is not empty,
10011014
1. Let _chunk_ be DequeueValue(_reader_@[[ownerReadableStream]]@[[queue]]).
10021015
1. If _reader_@[[ownerReadableStream]]@[[closeRequested]] is *true* and _reader_@[[ownerReadableStream]]@[[queue]] is now empty, perform FinishClosingReadableStream(_reader_@[[ownerReadableStream]]).

reference-implementation/lib/readable-stream.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ export default class ReadableStream {
1616
this._reader = undefined;
1717
this._storedError = undefined;
1818

19+
this._disturbed = false;
20+
1921
const normalizedStrategy = ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
2022
this._strategySize = normalizedStrategy.size;
2123
this._strategyHWM = normalizedStrategy.highWaterMark;
@@ -336,6 +338,8 @@ function CancelReadableStream(stream, reason) {
336338
return Promise.reject(stream._storedError);
337339
}
338340

341+
stream._disturbed = true;
342+
339343
stream._queue = [];
340344
FinishClosingReadableStream(stream);
341345

@@ -439,6 +443,12 @@ function IsReadableStream(x) {
439443
return true;
440444
}
441445

446+
export function IsReadableStreamDisturbed(stream) {
447+
assert(IsReadableStream(stream) === true, 'IsReadableStreamDisturbed should only be used on known readable streams');
448+
449+
return stream._disturbed;
450+
}
451+
442452
function IsReadableStreamLocked(stream) {
443453
assert(IsReadableStream(stream) === true, 'IsReadableStreamLocked should only be used on known readable streams');
444454

@@ -485,6 +495,8 @@ function ReadFromReadableStreamReader(reader) {
485495
assert(reader._ownerReadableStream !== undefined);
486496
assert(reader._ownerReadableStream._state === 'readable');
487497

498+
reader._ownerReadableStream._disturbed = true;
499+
488500
if (reader._ownerReadableStream._queue.length > 0) {
489501
const chunk = DequeueValue(reader._ownerReadableStream._queue);
490502

reference-implementation/run-tests.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@ const glob = require('glob');
22
const path = require('path');
33

44
import ReadableStream from './lib/readable-stream';
5+
import { IsReadableStreamDisturbed } from './lib/readable-stream';
56
import WritableStream from './lib/writable-stream';
67
import ByteLengthQueuingStrategy from './lib/byte-length-queuing-strategy';
78
import CountQueuingStrategy from './lib/count-queuing-strategy';
89
import TransformStream from './lib/transform-stream';
910

1011
global.ReadableStream = ReadableStream;
12+
global.IsReadableStreamDisturbed = IsReadableStreamDisturbed;
1113
global.WritableStream = WritableStream;
1214
global.ByteLengthQueuingStrategy = ByteLengthQueuingStrategy;
1315
global.CountQueuingStrategy = CountQueuingStrategy;

reference-implementation/test/readable-stream.js

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,3 +771,33 @@ test('ReadableStream integration test: adapting an async pull source', t => {
771771
t.end();
772772
});
773773
});
774+
775+
test('IsReadableStreamDisturbed returns true for a stream on which read() has been called', t => {
776+
const rs = new ReadableStream();
777+
778+
t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction');
779+
780+
const reader = rs.getReader();
781+
t.equal(IsReadableStreamDisturbed(rs), false,
782+
'getReader() call has no effect on whether a stream is disturbed or not');
783+
784+
reader.read();
785+
t.equal(IsReadableStreamDisturbed(rs), true, 'rs should be disturbed after read() call');
786+
787+
t.end();
788+
});
789+
790+
test('IsReadableStreamDisturbed returns true for a stream on which cancel() has been called', t => {
791+
const rs = new ReadableStream();
792+
793+
t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction');
794+
795+
const reader = rs.getReader();
796+
t.equal(IsReadableStreamDisturbed(rs), false,
797+
'getReader() call has no effect on whether a stream is disturbed or not');
798+
799+
reader.cancel();
800+
t.equal(IsReadableStreamDisturbed(rs), true, 'rs should be disturbed after cancel() call');
801+
802+
t.end();
803+
});

0 commit comments

Comments
 (0)