-
Notifications
You must be signed in to change notification settings - Fork 356
/
Copy pathwebtransport.ts
103 lines (89 loc) · 2.69 KB
/
webtransport.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import { Transport } from "../transport.js";
import { nextTick } from "./websocket-constructor.js";
import {
Packet,
createPacketDecoderStream,
createPacketEncoderStream,
} from "engine.io-parser";
import debugModule from "debug"; // debug()
const debug = debugModule("engine.io-client:webtransport"); // debug()
export class WT extends Transport {
private transport: any;
private writer: any;
get name() {
return "webtransport";
}
protected doOpen() {
// @ts-ignore
if (typeof WebTransport !== "function") {
return;
}
// @ts-ignore
this.transport = new WebTransport(
this.createUri("https"),
this.opts.transportOptions[this.name]
);
this.transport.closed
.then(() => {
debug("transport closed gracefully");
this.onClose();
})
.catch((err) => {
debug("transport closed due to %s", err);
this.onError("webtransport error", err);
});
// note: we could have used async/await, but that would require some additional polyfills
this.transport.ready.then(() => {
this.transport.createBidirectionalStream().then((stream) => {
const decoderStream = createPacketDecoderStream(
Number.MAX_SAFE_INTEGER,
this.socket.binaryType
);
const reader = stream.readable.pipeThrough(decoderStream).getReader();
const encoderStream = createPacketEncoderStream();
encoderStream.readable.pipeTo(stream.writable);
this.writer = encoderStream.writable.getWriter();
const read = () => {
reader
.read()
.then(({ done, value }) => {
if (done) {
debug("session is closed");
return;
}
debug("received chunk: %o", value);
this.onPacket(value);
read();
})
.catch((err) => {
debug("an error occurred while reading: %s", err);
});
};
read();
const packet: Packet = { type: "open" };
if (this.query.sid) {
packet.data = `{"sid":"${this.query.sid}"}`;
}
this.writer.write(packet).then(() => this.onOpen());
});
});
}
protected write(packets: Packet[]) {
this.writable = false;
for (let i = 0; i < packets.length; i++) {
const packet = packets[i];
const lastPacket = i === packets.length - 1;
this.writer.write(packet).then(() => {
if (lastPacket) {
nextTick(() => {
this.writable = true;
this.emitReserved("drain");
}, this.setTimeoutFn);
}
});
}
}
protected doClose() {
this.transport?.close();
}
}