Skip to content

Commit 4218e2e

Browse files
tkrotoffgreglo
authored andcommitted
Improve the examples, some were missing
See https://streams.spec.whatwg.org/#creating-examples
1 parent e447883 commit 4218e2e

File tree

1 file changed

+227
-60
lines changed

1 file changed

+227
-60
lines changed

whatwg-streams/whatwg-streams-tests.ts

+227-60
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,39 @@
1-
function makeReadableWebSocketStream(url: string, protocols: string[]) {
2-
const ws = new WebSocket(url, protocols);
3-
ws.binaryType = "arraybuffer";
1+
/// <reference types="node" />
42

5-
return new ReadableStream({
6-
start(controller) {
7-
ws.onmessage = event => controller.enqueue(event.data);
8-
ws.onclose = () => controller.close();
9-
ws.onerror = () => controller.error(new Error("The WebSocket errored!"));
10-
},
3+
// Examples taken from https://streams.spec.whatwg.org/#creating-examples
114

12-
cancel() {
13-
ws.close();
14-
}
15-
});
5+
// 8.1. A readable stream with an underlying push source (no backpressure support)
6+
7+
{
8+
function makeReadableWebSocketStream(url: string, protocols: string | string[]) {
9+
const ws = new WebSocket(url, protocols);
10+
ws.binaryType = "arraybuffer";
11+
12+
return new ReadableStream({
13+
start(controller) {
14+
ws.onmessage = event => controller.enqueue(event.data);
15+
ws.onclose = () => controller.close();
16+
ws.onerror = () => controller.error(new Error("The WebSocket errored!"));
17+
},
18+
19+
cancel() {
20+
ws.close();
21+
}
22+
});
23+
}
24+
25+
const writableStream = new WritableStream();
26+
27+
const webSocketStream = makeReadableWebSocketStream("wss://example.com:443/", "protocol");
28+
29+
webSocketStream.pipeTo(writableStream)
30+
.then(() => console.log("All data successfully written!"))
31+
.catch(e => console.error("Something went wrong!", e));
1632
}
1733

34+
35+
// 8.2. A readable stream with an underlying push source and backpressure support
36+
1837
function makeReadableBackpressureSocketStream(host: string, port: number) {
1938
const socket = createBackpressureSocket(host, port);
2039

@@ -49,6 +68,9 @@ function makeReadableBackpressureSocketStream(host: string, port: number) {
4968
function createBackpressureSocket(host: string, port: number): any { };
5069
}
5170

71+
72+
// 8.3. A readable byte stream with an underlying push source (no backpressure support)
73+
5274
const DEFAULT_CHUNK_SIZE = 65536;
5375

5476
function makeUDPSocketStream(host: string, port: number) {
@@ -93,7 +115,100 @@ function makeUDPSocketStream(host: string, port: number) {
93115
function createUDPSocket(host: string, port: number): any { };
94116
}
95117

96-
function makeWritableWebSocketStream(url: string, protocols: string[]) {
118+
119+
// 8.4. A readable stream with an underlying pull source
120+
121+
//const fs = require("pr/fs"); // https://github.com/jden/pr
122+
interface fs {
123+
open(path: string | Buffer, flags: string | number): Promise<number>;
124+
read(fd: number, buffer: Buffer, offset: number, length: number, position: number): Promise<number>;
125+
write(fd: number, buffer: Buffer, offset: number, length: number): Promise<number>;
126+
close(fd: number): Promise<void>;
127+
}
128+
let fs: fs;
129+
130+
{
131+
const CHUNK_SIZE = 1024;
132+
133+
function makeReadableFileStream(filename: string) {
134+
let fd: number;
135+
let position = 0;
136+
137+
return new ReadableStream({
138+
start() {
139+
return fs.open(filename, "r").then(result => {
140+
fd = result;
141+
});
142+
},
143+
144+
pull(controller) {
145+
const buffer = new ArrayBuffer(CHUNK_SIZE);
146+
147+
return fs.read(fd, <any> buffer, 0, CHUNK_SIZE, position).then(bytesRead => {
148+
if (bytesRead === 0) {
149+
return fs.close(fd).then(() => controller.close());
150+
} else {
151+
position += bytesRead;
152+
controller.enqueue(new Uint8Array(buffer, 0, bytesRead));
153+
}
154+
});
155+
},
156+
157+
cancel() {
158+
return fs.close(fd);
159+
}
160+
});
161+
}
162+
}
163+
164+
165+
// 8.5. A readable byte stream with an underlying pull source
166+
167+
{
168+
//const fs = require("pr/fs"); // https://github.com/jden/pr
169+
const DEFAULT_CHUNK_SIZE = 1024;
170+
171+
function makeReadableByteFileStream(filename: string) {
172+
let fd: number;
173+
let position = 0;
174+
175+
return new ReadableStream({
176+
type: "bytes",
177+
178+
start() {
179+
return fs.open(filename, "r").then(result => {
180+
fd = result;
181+
});
182+
},
183+
184+
pull(controller) {
185+
// Even when the consumer is using the default reader, the auto-allocation
186+
// feature allocates a buffer and passes it to us via byobRequest.
187+
const v = controller.byobRequest.view;
188+
189+
return fs.read(fd, <any> v.buffer, v.byteOffset, v.byteLength, position).then(bytesRead => {
190+
if (bytesRead === 0) {
191+
return fs.close(fd).then(() => controller.close());
192+
} else {
193+
position += bytesRead;
194+
controller.byobRequest.respond(bytesRead);
195+
}
196+
});
197+
},
198+
199+
cancel() {
200+
return fs.close(fd);
201+
},
202+
203+
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE
204+
});
205+
}
206+
}
207+
208+
209+
// 8.6. A writable stream with no backpressure or success signals
210+
211+
function makeWritableWebSocketStream(url: string, protocols: string | string[]) {
97212
const ws = new WebSocket(url, protocols);
98213

99214
return new WritableStream({
@@ -117,71 +232,123 @@ function makeWritableWebSocketStream(url: string, protocols: string[]) {
117232
});
118233
}
119234

120-
function streamifyWebSocket(url: string, protocol: string) {
121-
const ws = new WebSocket(url, protocol);
122-
ws.binaryType = "arraybuffer";
235+
{
236+
const readableStream = new ReadableStream();
237+
238+
const webSocketStream = makeWritableWebSocketStream("wss://example.com:443/", "protocol");
123239

124-
return {
125-
readable: new ReadableStream(new WebSocketSource(ws)),
126-
writable: new WritableStream(new WebSocketSink(ws))
127-
};
240+
readableStream.pipeTo(webSocketStream)
241+
.then(() => console.log("All data successfully written!"))
242+
.catch(e => console.error("Something went wrong!", e));
128243
}
129244

130-
class WebSocketSource implements ReadableStreamSource {
131-
private _ws: WebSocket
132245

133-
constructor(ws: WebSocket) {
134-
this._ws = ws;
135-
}
246+
// 8.7. A writable stream with backpressure and success signals
136247

137-
start(controller: ReadableStreamDefaultController) {
138-
this._ws.onmessage = event => controller.enqueue(event.data);
139-
this._ws.onclose = () => controller.close();
248+
{
249+
//const fs = require("pr/fs"); // https://github.com/jden/pr
140250

141-
this._ws.addEventListener("error", () => {
142-
controller.error(new Error("The WebSocket errored!"));
251+
function makeWritableFileStream(filename: string) {
252+
let fd: number;
253+
254+
return new WritableStream({
255+
start() {
256+
return fs.open(filename, "w").then(result => {
257+
fd = result;
258+
});
259+
},
260+
261+
write(chunk) {
262+
return fs.write(fd, chunk, 0, chunk.length);
263+
},
264+
265+
close() {
266+
return fs.close(fd);
267+
}
143268
});
144269
}
145270

146-
cancel() {
147-
this._ws.close();
148-
}
271+
const fileStream = makeWritableFileStream("/example/path/on/fs.txt");
272+
const writer = fileStream.getWriter();
273+
274+
writer.write("To stream, or not to stream\n");
275+
writer.write("That is the question\n");
276+
277+
writer.close()
278+
.then(() => console.log("chunks written and stream closed successfully!"))
279+
.catch(e => console.error(e));
149280
}
150281

151-
class WebSocketSink implements WritableStreamSink {
152-
private _ws: WebSocket
153282

154-
constructor(ws: WebSocket) {
155-
this._ws = ws;
156-
}
283+
// 8.8. A { readable, writable } stream pair wrapping the same underlying resource
157284

158-
start(controller: WritableStreamDefaultController) {
159-
this._ws.addEventListener("error", () => {
160-
controller.error(new Error("The WebSocket errored!"));
161-
});
285+
{
286+
function streamifyWebSocket(url: string, protocol: string) {
287+
const ws = new WebSocket(url, protocol);
288+
ws.binaryType = "arraybuffer";
162289

163-
return new Promise<void>(resolve => this._ws.onopen = () => resolve());
290+
return {
291+
readable: new ReadableStream(new WebSocketSource(ws)),
292+
writable: new WritableStream(new WebSocketSink(ws))
293+
};
164294
}
165295

166-
write(chunk: any) {
167-
this._ws.send(chunk);
168-
}
296+
class WebSocketSource implements ReadableStreamSource {
297+
private _ws: WebSocket
298+
299+
constructor(ws: WebSocket) {
300+
this._ws = ws;
301+
}
169302

170-
close() {
171-
return new Promise<void>((resolve, reject) => {
172-
this._ws.onclose = () => resolve();
303+
start(controller: ReadableStreamDefaultController) {
304+
this._ws.onmessage = event => controller.enqueue(event.data);
305+
this._ws.onclose = () => controller.close();
306+
307+
this._ws.addEventListener("error", () => {
308+
controller.error(new Error("The WebSocket errored!"));
309+
});
310+
}
311+
312+
cancel() {
173313
this._ws.close();
174-
});
314+
}
175315
}
176-
}
177316

178-
const streamyWS = streamifyWebSocket("wss://example.com:443/", "protocol");
179-
const writer = streamyWS.writable.getWriter();
180-
const reader = streamyWS.readable.getReader();
317+
class WebSocketSink implements WritableStreamSink {
318+
private _ws: WebSocket
319+
320+
constructor(ws: WebSocket) {
321+
this._ws = ws;
322+
}
323+
324+
start(controller: WritableStreamDefaultController) {
325+
this._ws.addEventListener("error", () => {
326+
controller.error(new Error("The WebSocket errored!"));
327+
});
181328

182-
writer.write("Hello");
183-
writer.write("web socket!");
329+
return new Promise<void>(resolve => this._ws.onopen = () => resolve());
330+
}
184331

185-
reader.read().then(({ value, done }) => {
186-
console.log("The web socket says: ", value);
187-
});
332+
write(chunk: any) {
333+
this._ws.send(chunk);
334+
}
335+
336+
close() {
337+
return new Promise<void>((resolve, reject) => {
338+
this._ws.onclose = () => resolve();
339+
this._ws.close();
340+
});
341+
}
342+
}
343+
344+
const streamyWS = streamifyWebSocket("wss://example.com:443/", "protocol");
345+
const writer = streamyWS.writable.getWriter();
346+
const reader = streamyWS.readable.getReader();
347+
348+
writer.write("Hello");
349+
writer.write("web socket!");
350+
351+
reader.read().then(({ value, done }) => {
352+
console.log("The web socket says: ", value);
353+
});
354+
}

0 commit comments

Comments
 (0)