Skip to content

Commit 271e2df

Browse files
feat: add an implementation based on uWebSockets.js
```js const { App } = require("uWebSockets.js"); const { uServer } = require("engine.io"); const app = new App(); const server = new uServer(); server.attach(app); app.listen(3000); ``` Reference: https://github.com/uNetworking/uWebSockets.js Related: #578
1 parent 37474c7 commit 271e2df

File tree

14 files changed

+1098
-203
lines changed

14 files changed

+1098
-203
lines changed

.github/workflows/ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ jobs:
1212

1313
strategy:
1414
matrix:
15-
node-version: [10.x, 12.x, 14.x]
15+
node-version: [12.x, 14.x]
1616

1717
steps:
1818
- uses: actions/checkout@v2

lib/engine.io.ts

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import * as parser from "engine.io-parser";
55

66
export { Server, transports, listen, attach, parser };
77
export { AttachOptions, ServerOptions } from "./server";
8+
export { uServer } from "./userver";
89
export { Socket } from "./socket";
910
export { Transport } from "./transport";
1011
export const protocol = parser.protocol;

lib/server.ts

+149-130
Original file line numberDiff line numberDiff line change
@@ -113,15 +113,12 @@ export interface ServerOptions {
113113
allowEIO3?: boolean;
114114
}
115115

116-
export class Server extends EventEmitter {
116+
export abstract class BaseServer extends EventEmitter {
117117
public opts: ServerOptions;
118-
public httpServer?: HttpServer;
119118

120-
private clients: any;
119+
protected clients: any;
121120
private clientsCount: number;
122-
private ws: any;
123-
private corsMiddleware: Function;
124-
private perMessageDeflate: any;
121+
protected corsMiddleware: Function;
125122

126123
/**
127124
* Server constructor.
@@ -182,42 +179,7 @@ export class Server extends EventEmitter {
182179
this.init();
183180
}
184181

185-
/**
186-
* Initialize websocket server
187-
*
188-
* @api private
189-
*/
190-
private init() {
191-
if (!~this.opts.transports.indexOf("websocket")) return;
192-
193-
if (this.ws) this.ws.close();
194-
195-
this.ws = new this.opts.wsEngine({
196-
noServer: true,
197-
clientTracking: false,
198-
perMessageDeflate: this.opts.perMessageDeflate,
199-
maxPayload: this.opts.maxHttpBufferSize
200-
});
201-
202-
if (typeof this.ws.on === "function") {
203-
this.ws.on("headers", (headersArray, req) => {
204-
// note: 'ws' uses an array of headers, while Engine.IO uses an object (response.writeHead() accepts both formats)
205-
// we could also try to parse the array and then sync the values, but that will be error-prone
206-
const additionalHeaders = {};
207-
208-
const isInitialRequest = !req._query.sid;
209-
if (isInitialRequest) {
210-
this.emit("initial_headers", additionalHeaders, req);
211-
}
212-
213-
this.emit("headers", additionalHeaders, req);
214-
215-
Object.keys(additionalHeaders).forEach(key => {
216-
headersArray.push(`${key}: ${additionalHeaders[key]}`);
217-
});
218-
});
219-
}
220-
}
182+
protected abstract init();
221183

222184
/**
223185
* Returns a list of available transports for upgrade given a certain transport.
@@ -237,7 +199,7 @@ export class Server extends EventEmitter {
237199
* @return {Boolean} whether the request is valid
238200
* @api private
239201
*/
240-
private verify(req, upgrade, fn) {
202+
protected verify(req, upgrade, fn) {
241203
// transport check
242204
const transport = req._query.transport;
243205
if (!~this.opts.transports.indexOf(transport)) {
@@ -298,18 +260,6 @@ export class Server extends EventEmitter {
298260
fn();
299261
}
300262

301-
/**
302-
* Prepares a request by processing the query string.
303-
*
304-
* @api private
305-
*/
306-
private prepare(req) {
307-
// try to leverage pre-existing `req._query` (e.g: from connect)
308-
if (!req._query) {
309-
req._query = ~req.url.indexOf("?") ? qs.parse(parse(req.url).query) : {};
310-
}
311-
}
312-
313263
/**
314264
* Closes all clients.
315265
*
@@ -322,56 +272,11 @@ export class Server extends EventEmitter {
322272
this.clients[i].close(true);
323273
}
324274
}
325-
if (this.ws) {
326-
debug("closing webSocketServer");
327-
this.ws.close();
328-
// don't delete this.ws because it can be used again if the http server starts listening again
329-
}
275+
this.cleanup();
330276
return this;
331277
}
332278

333-
/**
334-
* Handles an Engine.IO HTTP request.
335-
*
336-
* @param {http.IncomingMessage} request
337-
* @param {http.ServerResponse|http.OutgoingMessage} response
338-
* @api public
339-
*/
340-
public handleRequest(req, res) {
341-
debug('handling "%s" http request "%s"', req.method, req.url);
342-
this.prepare(req);
343-
req.res = res;
344-
345-
const callback = (errorCode, errorContext) => {
346-
if (errorCode !== undefined) {
347-
this.emit("connection_error", {
348-
req,
349-
code: errorCode,
350-
message: Server.errorMessages[errorCode],
351-
context: errorContext
352-
});
353-
abortRequest(res, errorCode, errorContext);
354-
return;
355-
}
356-
357-
if (req._query.sid) {
358-
debug("setting new request for existing client");
359-
this.clients[req._query.sid].transport.onRequest(req);
360-
} else {
361-
const closeConnection = (errorCode, errorContext) =>
362-
abortRequest(res, errorCode, errorContext);
363-
this.handshake(req._query.transport, req, closeConnection);
364-
}
365-
};
366-
367-
if (this.corsMiddleware) {
368-
this.corsMiddleware.call(null, req, res, () => {
369-
this.verify(req, false, callback);
370-
});
371-
} else {
372-
this.verify(req, false, callback);
373-
}
374-
}
279+
protected abstract cleanup();
375280

376281
/**
377282
* generate a socket id.
@@ -391,9 +296,9 @@ export class Server extends EventEmitter {
391296
* @param {Object} request object
392297
* @param {Function} closeConnection
393298
*
394-
* @api private
299+
* @api protected
395300
*/
396-
private async handshake(transportName, req, closeConnection) {
301+
protected async handshake(transportName, req, closeConnection) {
397302
const protocol = req._query.EIO === "4" ? 4 : 3; // 3rd revision by default
398303
if (protocol === 3 && !this.opts.allowEIO3) {
399304
debug("unsupported protocol version");
@@ -431,7 +336,7 @@ export class Server extends EventEmitter {
431336
debug('handshaking client "%s"', id);
432337

433338
try {
434-
var transport = new transports[transportName](req);
339+
var transport = this.createTransport(transportName, req);
435340
if ("polling" === transportName) {
436341
transport.maxHttpBufferSize = this.opts.maxHttpBufferSize;
437342
transport.httpCompression = this.opts.httpCompression;
@@ -486,6 +391,141 @@ export class Server extends EventEmitter {
486391
});
487392

488393
this.emit("connection", socket);
394+
395+
return transport;
396+
}
397+
398+
protected abstract createTransport(transportName, req);
399+
400+
/**
401+
* Protocol errors mappings.
402+
*/
403+
404+
static errors = {
405+
UNKNOWN_TRANSPORT: 0,
406+
UNKNOWN_SID: 1,
407+
BAD_HANDSHAKE_METHOD: 2,
408+
BAD_REQUEST: 3,
409+
FORBIDDEN: 4,
410+
UNSUPPORTED_PROTOCOL_VERSION: 5
411+
};
412+
413+
static errorMessages = {
414+
0: "Transport unknown",
415+
1: "Session ID unknown",
416+
2: "Bad handshake method",
417+
3: "Bad request",
418+
4: "Forbidden",
419+
5: "Unsupported protocol version"
420+
};
421+
}
422+
423+
export class Server extends BaseServer {
424+
public httpServer?: HttpServer;
425+
private ws: any;
426+
427+
/**
428+
* Initialize websocket server
429+
*
430+
* @api protected
431+
*/
432+
protected init() {
433+
if (!~this.opts.transports.indexOf("websocket")) return;
434+
435+
if (this.ws) this.ws.close();
436+
437+
this.ws = new this.opts.wsEngine({
438+
noServer: true,
439+
clientTracking: false,
440+
perMessageDeflate: this.opts.perMessageDeflate,
441+
maxPayload: this.opts.maxHttpBufferSize
442+
});
443+
444+
if (typeof this.ws.on === "function") {
445+
this.ws.on("headers", (headersArray, req) => {
446+
// note: 'ws' uses an array of headers, while Engine.IO uses an object (response.writeHead() accepts both formats)
447+
// we could also try to parse the array and then sync the values, but that will be error-prone
448+
const additionalHeaders = {};
449+
450+
const isInitialRequest = !req._query.sid;
451+
if (isInitialRequest) {
452+
this.emit("initial_headers", additionalHeaders, req);
453+
}
454+
455+
this.emit("headers", additionalHeaders, req);
456+
457+
Object.keys(additionalHeaders).forEach(key => {
458+
headersArray.push(`${key}: ${additionalHeaders[key]}`);
459+
});
460+
});
461+
}
462+
}
463+
464+
protected cleanup() {
465+
if (this.ws) {
466+
debug("closing webSocketServer");
467+
this.ws.close();
468+
// don't delete this.ws because it can be used again if the http server starts listening again
469+
}
470+
}
471+
472+
/**
473+
* Prepares a request by processing the query string.
474+
*
475+
* @api private
476+
*/
477+
private prepare(req) {
478+
// try to leverage pre-existing `req._query` (e.g: from connect)
479+
if (!req._query) {
480+
req._query = ~req.url.indexOf("?") ? qs.parse(parse(req.url).query) : {};
481+
}
482+
}
483+
484+
protected createTransport(transportName, req) {
485+
return new transports[transportName](req);
486+
}
487+
488+
/**
489+
* Handles an Engine.IO HTTP request.
490+
*
491+
* @param {http.IncomingMessage} request
492+
* @param {http.ServerResponse|http.OutgoingMessage} response
493+
* @api public
494+
*/
495+
public handleRequest(req, res) {
496+
debug('handling "%s" http request "%s"', req.method, req.url);
497+
this.prepare(req);
498+
req.res = res;
499+
500+
const callback = (errorCode, errorContext) => {
501+
if (errorCode !== undefined) {
502+
this.emit("connection_error", {
503+
req,
504+
code: errorCode,
505+
message: Server.errorMessages[errorCode],
506+
context: errorContext
507+
});
508+
abortRequest(res, errorCode, errorContext);
509+
return;
510+
}
511+
512+
if (req._query.sid) {
513+
debug("setting new request for existing client");
514+
this.clients[req._query.sid].transport.onRequest(req);
515+
} else {
516+
const closeConnection = (errorCode, errorContext) =>
517+
abortRequest(res, errorCode, errorContext);
518+
this.handshake(req._query.transport, req, closeConnection);
519+
}
520+
};
521+
522+
if (this.corsMiddleware) {
523+
this.corsMiddleware.call(null, req, res, () => {
524+
this.verify(req, false, callback);
525+
});
526+
} else {
527+
this.verify(req, false, callback);
528+
}
489529
}
490530

491531
/**
@@ -559,13 +599,13 @@ export class Server extends EventEmitter {
559599
// transport error handling takes over
560600
websocket.removeListener("error", onUpgradeError);
561601

562-
const transport = new transports[req._query.transport](req);
602+
const transport = this.createTransport(req._query.transport, req);
563603
if (req._query && req._query.b64) {
564604
transport.supportsBinary = false;
565605
} else {
566606
transport.supportsBinary = true;
567607
}
568-
transport.perMessageDeflate = this.perMessageDeflate;
608+
transport.perMessageDeflate = this.opts.perMessageDeflate;
569609
client.maybeUpgrade(transport);
570610
}
571611
} else {
@@ -590,7 +630,7 @@ export class Server extends EventEmitter {
590630
* @param {Object} options
591631
* @api public
592632
*/
593-
public attach(server, options: AttachOptions = {}) {
633+
public attach(server: HttpServer, options: AttachOptions = {}) {
594634
let path = (options.path || "/engine.io").replace(/\/$/, "");
595635

596636
const destroyUpgradeTimeout = options.destroyUpgradeTimeout || 1000;
@@ -632,6 +672,7 @@ export class Server extends EventEmitter {
632672
// and if no eio thing handles the upgrade
633673
// then the socket needs to die!
634674
setTimeout(function() {
675+
// @ts-ignore
635676
if (socket.writable && socket.bytesWritten <= 0) {
636677
return socket.end();
637678
}
@@ -640,28 +681,6 @@ export class Server extends EventEmitter {
640681
});
641682
}
642683
}
643-
644-
/**
645-
* Protocol errors mappings.
646-
*/
647-
648-
static errors = {
649-
UNKNOWN_TRANSPORT: 0,
650-
UNKNOWN_SID: 1,
651-
BAD_HANDSHAKE_METHOD: 2,
652-
BAD_REQUEST: 3,
653-
FORBIDDEN: 4,
654-
UNSUPPORTED_PROTOCOL_VERSION: 5
655-
};
656-
657-
static errorMessages = {
658-
0: "Transport unknown",
659-
1: "Session ID unknown",
660-
2: "Bad handshake method",
661-
3: "Bad request",
662-
4: "Forbidden",
663-
5: "Unsupported protocol version"
664-
};
665684
}
666685

667686
/**

0 commit comments

Comments
 (0)