Skip to content

Commit 96b58b5

Browse files
committed
Support default subprotocol
1 parent 9350f0f commit 96b58b5

File tree

2 files changed

+201
-22
lines changed

2 files changed

+201
-22
lines changed

packages/services/src/kernel/default.ts

+26-7
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import {
2121
KernelShellFutureHandler
2222
} from './future';
2323

24-
import * as serialize from './serialize';
24+
import { deserialize, serialize } from './serialize';
2525

2626
import * as validate from './validate';
2727
import { KernelSpec, KernelSpecAPI } from '../kernelspec';
@@ -407,7 +407,7 @@ export class KernelConnection implements Kernel.IKernelConnection {
407407
KernelMessage.isInfoRequestMsg(msg)
408408
) {
409409
if (this.connectionStatus === 'connected') {
410-
this._ws!.send(serialize.serialize(msg));
410+
this._ws!.send(serialize(msg, this._ws!.protocol));
411411
return;
412412
} else {
413413
throw new Error('Could not send message: status is not connected');
@@ -425,7 +425,7 @@ export class KernelConnection implements Kernel.IKernelConnection {
425425
this.connectionStatus === 'connected' &&
426426
this._kernelSession !== RESTARTING_KERNEL_SESSION
427427
) {
428-
this._ws!.send(serialize.serialize(msg));
428+
this._ws!.send(serialize(msg, this._ws!.protocol));
429429
} else if (queue) {
430430
this._pendingMessages.push(msg);
431431
} else {
@@ -1224,7 +1224,7 @@ export class KernelConnection implements Kernel.IKernelConnection {
12241224
/**
12251225
* Create the kernel websocket connection and add socket status handlers.
12261226
*/
1227-
private _createSocket = () => {
1227+
private _createSocket = (use_protocols = true) => {
12281228
this._errorIfDisposed();
12291229

12301230
// Make sure the socket is clear
@@ -1255,7 +1255,12 @@ export class KernelConnection implements Kernel.IKernelConnection {
12551255
url = url + `&token=${encodeURIComponent(token)}`;
12561256
}
12571257

1258-
this._ws = new settings.WebSocket(url, '0.0.1');
1258+
// Try opening the websocket with our list of subprotocols.
1259+
// If the server doesn't handle subprotocols, the accepted protocol will be ''.
1260+
// But we cannot send '' as a subprotocol, so if connection fails,
1261+
// reconnect without subprotocols.
1262+
const supported_protocols = use_protocols ? this._supported_protocols : [];
1263+
this._ws = new settings.WebSocket(url, supported_protocols);
12591264

12601265
// Ensure incoming binary messages are not Blobs
12611266
this._ws.binaryType = 'arraybuffer';
@@ -1477,7 +1482,8 @@ export class KernelConnection implements Kernel.IKernelConnection {
14771482
timeout / 1000
14781483
)} seconds.`
14791484
);
1480-
this._reconnectTimeout = setTimeout(this._createSocket, timeout);
1485+
// Try reconnection without subprotocols.
1486+
this._reconnectTimeout = setTimeout(this._createSocket, timeout, false);
14811487
this._reconnectAttempt += 1;
14821488
} else {
14831489
this._updateConnectionStatus('disconnected');
@@ -1502,6 +1508,18 @@ export class KernelConnection implements Kernel.IKernelConnection {
15021508
* Handle a websocket open event.
15031509
*/
15041510
private _onWSOpen = (evt: Event) => {
1511+
if (
1512+
this._ws!.protocol != '' &&
1513+
!this._supported_protocols.includes(this._ws!.protocol)
1514+
) {
1515+
console.log(
1516+
'Server selected unkown kernel wire protocol:',
1517+
this._ws!.protocol
1518+
);
1519+
console.log('Supported protocols are:', this._supported_protocols);
1520+
this._updateStatus('dead');
1521+
throw new Error('Unkown kernel wire protocol: ' + this._ws!.protocol);
1522+
}
15051523
this._ws!.onclose = this._onWSClose;
15061524
this._ws!.onerror = this._onWSClose;
15071525
this._updateConnectionStatus('connected');
@@ -1514,7 +1532,7 @@ export class KernelConnection implements Kernel.IKernelConnection {
15141532
// Notify immediately if there is an error with the message.
15151533
let msg: KernelMessage.IMessage;
15161534
try {
1517-
msg = serialize.deserialize(evt.data);
1535+
msg = deserialize(evt.data, this._ws!.protocol);
15181536
validate.validateMessage(msg);
15191537
} catch (error) {
15201538
error.message = `Kernel message validation error: ${error.message}`;
@@ -1574,6 +1592,7 @@ export class KernelConnection implements Kernel.IKernelConnection {
15741592
* Websocket to communicate with kernel.
15751593
*/
15761594
private _ws: WebSocket | null = null;
1595+
private _supported_protocols: string[] = ['0.0.1'];
15771596
private _username = '';
15781597
private _reconnectLimit = 7;
15791598
private _reconnectAttempt = 0;

packages/services/src/kernel/serialize.ts

+175-15
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,56 @@
33

44
import * as KernelMessage from './messages';
55

6+
/**
7+
* Serialize a kernel message for transport.
8+
*/
9+
export function serialize(
10+
msg: KernelMessage.IMessage,
11+
protocol: string
12+
): string | ArrayBuffer {
13+
switch (protocol) {
14+
case '0.0.1':
15+
return serialize_0_0_1(msg);
16+
default:
17+
return serialize_default(msg);
18+
}
19+
}
20+
621
/**
722
* Deserialize and return the unpacked message.
823
*/
9-
export function deserialize(binMsg: ArrayBuffer): KernelMessage.IMessage {
24+
export function deserialize(
25+
data: ArrayBuffer,
26+
protocol: string
27+
): KernelMessage.IMessage {
28+
switch (protocol) {
29+
case '0.0.1':
30+
return deserialize_0_0_1(data);
31+
default:
32+
return deserialize_default(data);
33+
}
34+
}
35+
36+
/**
37+
* Deserialize and return the unpacked message.
38+
* Protocol v0.0.1
39+
*/
40+
function deserialize_0_0_1(binMsg: ArrayBuffer): KernelMessage.IMessage {
1041
let msg: KernelMessage.IMessage;
1142
const data = new DataView(binMsg);
1243
const layoutLength = data.getUint16(0, true /* littleEndian */);
1344
const layoutBytes = new Uint8Array(binMsg.slice(2, 2 + layoutLength));
1445
const decoder = new TextDecoder('utf8');
1546
const layout = JSON.parse(decoder.decode(layoutBytes));
1647
const channel = layout.channel;
17-
let iter = getParts(new Uint8Array(binMsg.slice(2 + layoutLength)), layout.offsets);
48+
let iter = getParts(
49+
new Uint8Array(binMsg.slice(2 + layoutLength)),
50+
layout.offsets
51+
);
1852
const header = JSON.parse(decoder.decode(iter.next().value as Uint8Array));
19-
const parent_header = JSON.parse(decoder.decode(iter.next().value as Uint8Array));
53+
const parent_header = JSON.parse(
54+
decoder.decode(iter.next().value as Uint8Array)
55+
);
2056
const metadata = JSON.parse(decoder.decode(iter.next().value as Uint8Array));
2157
const content = JSON.parse(decoder.decode(iter.next().value as Uint8Array));
2258
let curr = iter.next();
@@ -31,58 +67,182 @@ export function deserialize(binMsg: ArrayBuffer): KernelMessage.IMessage {
3167
parent_header,
3268
metadata,
3369
content,
34-
buffers,
70+
buffers
3571
};
3672
return msg;
3773
}
3874

3975
/**
4076
* Serialize a kernel message for transport.
77+
* Protocol v0.0.1
4178
*/
42-
export function serialize(msg: KernelMessage.IMessage): ArrayBuffer {
79+
function serialize_0_0_1(msg: KernelMessage.IMessage): ArrayBuffer {
4380
const header = JSON.stringify(msg.header);
4481
const parent_header = JSON.stringify(msg.parent_header);
4582
const metadata = JSON.stringify(msg.metadata);
4683
const content = JSON.stringify(msg.content);
4784
let offsets = [];
4885
let curr_sum = 0;
49-
for (var length of [header.length, parent_header.length, metadata.length, content.length]) {
86+
for (let length of [
87+
header.length,
88+
parent_header.length,
89+
metadata.length,
90+
content.length
91+
]) {
5092
offsets.push(length + curr_sum);
5193
curr_sum += length;
5294
}
5395
let buffersLength = 0;
54-
const buffers: (ArrayBuffer | ArrayBufferView)[] = (msg.buffers !== undefined) ? msg.buffers : [];
55-
for (var buffer of buffers) {
56-
length = buffer.byteLength;
96+
const buffers: (ArrayBuffer | ArrayBufferView)[] =
97+
msg.buffers !== undefined ? msg.buffers : [];
98+
for (let buffer of buffers) {
99+
let length = buffer.byteLength;
57100
offsets.push(length + curr_sum);
58101
curr_sum += length;
59102
buffersLength += length;
60103
}
61104
const layoutJson = {
62105
channel: msg.channel,
63-
offsets,
106+
offsets
64107
};
65108
const layout = JSON.stringify(layoutJson);
66109
const layoutLength = new ArrayBuffer(2);
67-
new DataView(layoutLength).setInt16(0, layout.length, true /* littleEndian */);
110+
new DataView(layoutLength).setInt16(
111+
0,
112+
layout.length,
113+
true /* littleEndian */
114+
);
68115
const encoder = new TextEncoder();
69-
const binMsgNoBuff = encoder.encode(layout + header + parent_header + metadata + content);
116+
const binMsgNoBuff = encoder.encode(
117+
layout + header + parent_header + metadata + content
118+
);
70119
const binMsg = new Uint8Array(2 + binMsgNoBuff.byteLength + buffersLength);
71120
binMsg.set(new Uint8Array(layoutLength), 0);
72121
binMsg.set(new Uint8Array(binMsgNoBuff), 2);
73122
let pos = 2 + binMsgNoBuff.byteLength;
74-
for (var buffer of buffers) {
75-
binMsg.set(new Uint8Array(ArrayBuffer.isView(buffer) ? buffer.buffer : buffer), pos);
123+
for (let buffer of buffers) {
124+
binMsg.set(
125+
new Uint8Array(ArrayBuffer.isView(buffer) ? buffer.buffer : buffer),
126+
pos
127+
);
76128
pos += buffer.byteLength;
77129
}
78130
return binMsg.buffer;
79131
}
80132

81133
function* getParts(binMsg: Uint8Array, offsets: number[]) {
82134
let i0 = 0;
83-
for (var i1 of offsets) {
135+
for (let i1 of offsets) {
84136
yield binMsg.slice(i0, i1);
85137
i0 = i1;
86138
}
87139
yield binMsg.slice(i0);
88140
}
141+
142+
/**
143+
* Deserialize and return the unpacked message.
144+
* Default protocol
145+
*
146+
* #### Notes
147+
* Handles JSON blob strings and binary messages.
148+
*/
149+
function deserialize_default(
150+
data: ArrayBuffer | string
151+
): KernelMessage.IMessage {
152+
let value: KernelMessage.IMessage;
153+
if (typeof data === 'string') {
154+
value = JSON.parse(data);
155+
} else {
156+
value = deserializeBinary(data);
157+
}
158+
return value;
159+
}
160+
161+
/**
162+
* Serialize a kernel message for transport.
163+
* Default protocol
164+
*
165+
* #### Notes
166+
* If there is binary content, an `ArrayBuffer` is returned,
167+
* otherwise the message is converted to a JSON string.
168+
*/
169+
function serialize_default(msg: KernelMessage.IMessage): string | ArrayBuffer {
170+
let value: string | ArrayBuffer;
171+
if (msg.buffers?.length) {
172+
value = serializeBinary(msg);
173+
} else {
174+
value = JSON.stringify(msg);
175+
}
176+
return value;
177+
}
178+
179+
/**
180+
* Deserialize a binary message to a Kernel Message.
181+
*/
182+
function deserializeBinary(buf: ArrayBuffer): KernelMessage.IMessage {
183+
const data = new DataView(buf);
184+
// read the header: 1 + nbufs 32b integers
185+
const nbufs = data.getUint32(0);
186+
const offsets: number[] = [];
187+
if (nbufs < 2) {
188+
throw new Error('Invalid incoming Kernel Message');
189+
}
190+
for (let i = 1; i <= nbufs; i++) {
191+
offsets.push(data.getUint32(i * 4));
192+
}
193+
const jsonBytes = new Uint8Array(buf.slice(offsets[0], offsets[1]));
194+
const msg = JSON.parse(new TextDecoder('utf8').decode(jsonBytes));
195+
// the remaining chunks are stored as DataViews in msg.buffers
196+
msg.buffers = [];
197+
for (let i = 1; i < nbufs; i++) {
198+
const start = offsets[i];
199+
const stop = offsets[i + 1] || buf.byteLength;
200+
msg.buffers.push(new DataView(buf.slice(start, stop)));
201+
}
202+
return msg;
203+
}
204+
205+
/**
206+
* Implement the binary serialization protocol.
207+
*
208+
* Serialize Kernel message to ArrayBuffer.
209+
*/
210+
function serializeBinary(msg: KernelMessage.IMessage): ArrayBuffer {
211+
const offsets: number[] = [];
212+
const buffers: ArrayBuffer[] = [];
213+
const encoder = new TextEncoder();
214+
let origBuffers: (ArrayBuffer | ArrayBufferView)[] = [];
215+
if (msg.buffers !== undefined) {
216+
origBuffers = msg.buffers;
217+
delete msg['buffers'];
218+
}
219+
const jsonUtf8 = encoder.encode(JSON.stringify(msg));
220+
buffers.push(jsonUtf8.buffer);
221+
for (let i = 0; i < origBuffers.length; i++) {
222+
// msg.buffers elements could be either views or ArrayBuffers
223+
// buffers elements are ArrayBuffers
224+
const b: any = origBuffers[i];
225+
buffers.push(ArrayBuffer.isView(b) ? b.buffer : b);
226+
}
227+
const nbufs = buffers.length;
228+
offsets.push(4 * (nbufs + 1));
229+
for (let i = 0; i + 1 < buffers.length; i++) {
230+
offsets.push(offsets[offsets.length - 1] + buffers[i].byteLength);
231+
}
232+
const msgBuf = new Uint8Array(
233+
offsets[offsets.length - 1] + buffers[buffers.length - 1].byteLength
234+
);
235+
// use DataView.setUint32 for network byte-order
236+
const view = new DataView(msgBuf.buffer);
237+
// write nbufs to first 4 bytes
238+
view.setUint32(0, nbufs);
239+
// write offsets to next 4 * nbufs bytes
240+
for (let i = 0; i < offsets.length; i++) {
241+
view.setUint32(4 * (i + 1), offsets[i]);
242+
}
243+
// write all the buffers at their respective offsets
244+
for (let i = 0; i < buffers.length; i++) {
245+
msgBuf.set(new Uint8Array(buffers[i]), offsets[i]);
246+
}
247+
return msgBuf.buffer;
248+
}

0 commit comments

Comments
 (0)