Skip to content

Commit 184f3cf

Browse files
feat: add promise-based acknowledgements
This commit adds some syntactic sugar around acknowledgements: - `emitWithAck()` ```js try { const responses = await io.timeout(1000).emitWithAck("some-event"); console.log(responses); // one response per client } catch (e) { // some clients did not acknowledge the event in the given delay } io.on("connection", async (socket) => { // without timeout const response = await socket.emitWithAck("hello", "world"); // with a specific timeout try { const response = await socket.timeout(1000).emitWithAck("hello", "world"); } catch (err) { // the client did not acknowledge the event in the given delay } }); ``` - `serverSideEmitWithAck()` ```js try { const responses = await io.timeout(1000).serverSideEmitWithAck("some-event"); console.log(responses); // one response per server (except itself) } catch (e) { // some servers did not acknowledge the event in the given delay } ``` Related: - #4175 - #4577 - #4583
1 parent 5d9220b commit 184f3cf

9 files changed

+391
-7
lines changed

Diff for: lib/broadcast-operator.ts

+33-1
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ import type {
77
EventNames,
88
EventsMap,
99
TypedEventBroadcaster,
10-
DecorateAcknowledgements,
1110
DecorateAcknowledgementsWithTimeoutAndMultipleResponses,
11+
AllButLast,
12+
Last,
13+
SecondArg,
1214
} from "./typed-events";
1315

1416
export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
@@ -276,6 +278,36 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
276278
return true;
277279
}
278280

281+
/**
282+
* Emits an event and waits for an acknowledgement from all clients.
283+
*
284+
* @example
285+
* try {
286+
* const responses = await io.timeout(1000).emitWithAck("some-event");
287+
* console.log(responses); // one response per client
288+
* } catch (e) {
289+
* // some clients did not acknowledge the event in the given delay
290+
* }
291+
*
292+
* @return a Promise that will be fulfilled when all clients have acknowledged the event
293+
*/
294+
public emitWithAck<Ev extends EventNames<EmitEvents>>(
295+
ev: Ev,
296+
...args: AllButLast<EventParams<EmitEvents, Ev>>
297+
): Promise<SecondArg<Last<EventParams<EmitEvents, Ev>>>> {
298+
return new Promise((resolve, reject) => {
299+
args.push((err, responses) => {
300+
if (err) {
301+
err.responses = responses;
302+
return reject(err);
303+
} else {
304+
return resolve(responses);
305+
}
306+
});
307+
this.emit(ev, ...(args as any[] as EventParams<EmitEvents, Ev>));
308+
});
309+
}
310+
279311
/**
280312
* Gets a list of clients.
281313
*

Diff for: lib/index.ts

+49-3
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,11 @@ import {
3434
EventParams,
3535
StrictEventEmitter,
3636
EventNames,
37-
DecorateAcknowledgements,
3837
DecorateAcknowledgementsWithTimeoutAndMultipleResponses,
38+
AllButLast,
39+
Last,
40+
FirstArg,
41+
SecondArg,
3942
} from "./typed-events";
4043
import { patchAdapter, restoreAdapter, serveFile } from "./uws";
4144
import type { BaseServer } from "engine.io/build/server";
@@ -811,6 +814,26 @@ export class Server<
811814
return this.sockets.except(room);
812815
}
813816

817+
/**
818+
* Emits an event and waits for an acknowledgement from all clients.
819+
*
820+
* @example
821+
* try {
822+
* const responses = await io.timeout(1000).emitWithAck("some-event");
823+
* console.log(responses); // one response per client
824+
* } catch (e) {
825+
* // some clients did not acknowledge the event in the given delay
826+
* }
827+
*
828+
* @return a Promise that will be fulfilled when all clients have acknowledged the event
829+
*/
830+
public emitWithAck<Ev extends EventNames<EmitEvents>>(
831+
ev: Ev,
832+
...args: AllButLast<EventParams<EmitEvents, Ev>>
833+
): Promise<SecondArg<Last<EventParams<EmitEvents, Ev>>>> {
834+
return this.sockets.emitWithAck(ev, ...args);
835+
}
836+
814837
/**
815838
* Sends a `message` event to all clients.
816839
*
@@ -854,9 +877,9 @@ export class Server<
854877
* // acknowledgements (without binary content) are supported too:
855878
* io.serverSideEmit("ping", (err, responses) => {
856879
* if (err) {
857-
* // some clients did not acknowledge the event in the given delay
880+
* // some servers did not acknowledge the event in the given delay
858881
* } else {
859-
* console.log(responses); // one response per client
882+
* console.log(responses); // one response per server (except the current one)
860883
* }
861884
* });
862885
*
@@ -877,6 +900,29 @@ export class Server<
877900
return this.sockets.serverSideEmit(ev, ...args);
878901
}
879902

903+
/**
904+
* Sends a message and expect an acknowledgement from the other Socket.IO servers of the cluster.
905+
*
906+
* @example
907+
* try {
908+
* const responses = await io.serverSideEmitWithAck("ping");
909+
* console.log(responses); // one response per server (except the current one)
910+
* } catch (e) {
911+
* // some servers did not acknowledge the event in the given delay
912+
* }
913+
*
914+
* @param ev - the event name
915+
* @param args - an array of arguments
916+
*
917+
* @return a Promise that will be fulfilled when all servers have acknowledged the event
918+
*/
919+
public serverSideEmitWithAck<Ev extends EventNames<ServerSideEvents>>(
920+
ev: Ev,
921+
...args: AllButLast<EventParams<ServerSideEvents, Ev>>
922+
): Promise<FirstArg<Last<EventParams<ServerSideEvents, Ev>>>[]> {
923+
return this.sockets.serverSideEmitWithAck(ev, ...args);
924+
}
925+
880926
/**
881927
* Gets a list of socket ids.
882928
*

Diff for: lib/namespace.ts

+68-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ import {
77
StrictEventEmitter,
88
DefaultEventsMap,
99
DecorateAcknowledgementsWithTimeoutAndMultipleResponses,
10+
AllButLast,
11+
Last,
12+
FirstArg,
13+
SecondArg,
1014
} from "./typed-events";
1115
import type { Client } from "./client";
1216
import debugModule from "debug";
@@ -433,6 +437,30 @@ export class Namespace<
433437
);
434438
}
435439

440+
/**
441+
* Emits an event and waits for an acknowledgement from all clients.
442+
*
443+
* @example
444+
* const myNamespace = io.of("/my-namespace");
445+
*
446+
* try {
447+
* const responses = await myNamespace.timeout(1000).emitWithAck("some-event");
448+
* console.log(responses); // one response per client
449+
* } catch (e) {
450+
* // some clients did not acknowledge the event in the given delay
451+
* }
452+
*
453+
* @return a Promise that will be fulfilled when all clients have acknowledged the event
454+
*/
455+
public emitWithAck<Ev extends EventNames<EmitEvents>>(
456+
ev: Ev,
457+
...args: AllButLast<EventParams<EmitEvents, Ev>>
458+
): Promise<SecondArg<Last<EventParams<EmitEvents, Ev>>>> {
459+
return new BroadcastOperator<EmitEvents, SocketData>(
460+
this.adapter
461+
).emitWithAck(ev, ...args);
462+
}
463+
436464
/**
437465
* Sends a `message` event to all clients.
438466
*
@@ -480,9 +508,9 @@ export class Namespace<
480508
* // acknowledgements (without binary content) are supported too:
481509
* myNamespace.serverSideEmit("ping", (err, responses) => {
482510
* if (err) {
483-
* // some clients did not acknowledge the event in the given delay
511+
* // some servers did not acknowledge the event in the given delay
484512
* } else {
485-
* console.log(responses); // one response per client
513+
* console.log(responses); // one response per server (except the current one)
486514
* }
487515
* });
488516
*
@@ -508,6 +536,44 @@ export class Namespace<
508536
return true;
509537
}
510538

539+
/**
540+
* Sends a message and expect an acknowledgement from the other Socket.IO servers of the cluster.
541+
*
542+
* @example
543+
* const myNamespace = io.of("/my-namespace");
544+
*
545+
* try {
546+
* const responses = await myNamespace.serverSideEmitWithAck("ping");
547+
* console.log(responses); // one response per server (except the current one)
548+
* } catch (e) {
549+
* // some servers did not acknowledge the event in the given delay
550+
* }
551+
*
552+
* @param ev - the event name
553+
* @param args - an array of arguments
554+
*
555+
* @return a Promise that will be fulfilled when all servers have acknowledged the event
556+
*/
557+
public serverSideEmitWithAck<Ev extends EventNames<ServerSideEvents>>(
558+
ev: Ev,
559+
...args: AllButLast<EventParams<ServerSideEvents, Ev>>
560+
): Promise<FirstArg<Last<EventParams<ServerSideEvents, Ev>>>[]> {
561+
return new Promise((resolve, reject) => {
562+
args.push((err, responses) => {
563+
if (err) {
564+
err.responses = responses;
565+
return reject(err);
566+
} else {
567+
return resolve(responses);
568+
}
569+
});
570+
this.serverSideEmit(
571+
ev,
572+
...(args as any[] as EventParams<ServerSideEvents, Ev>)
573+
);
574+
});
575+
}
576+
511577
/**
512578
* Called when a packet is received from another Socket.IO server
513579
*

Diff for: lib/socket.ts

+39
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ import { Packet, PacketType } from "socket.io-parser";
22
import debugModule from "debug";
33
import type { Server } from "./index";
44
import {
5+
AllButLast,
56
DecorateAcknowledgements,
67
DecorateAcknowledgementsWithMultipleResponses,
78
DefaultEventsMap,
89
EventNames,
910
EventParams,
1011
EventsMap,
12+
FirstArg,
13+
Last,
1114
StrictEventEmitter,
1215
} from "./typed-events";
1316
import type { Client } from "./client";
@@ -357,6 +360,42 @@ export class Socket<
357360
return true;
358361
}
359362

363+
/**
364+
* Emits an event and waits for an acknowledgement
365+
*
366+
* @example
367+
* io.on("connection", async (socket) => {
368+
* // without timeout
369+
* const response = await socket.emitWithAck("hello", "world");
370+
*
371+
* // with a specific timeout
372+
* try {
373+
* const response = await socket.timeout(1000).emitWithAck("hello", "world");
374+
* } catch (err) {
375+
* // the client did not acknowledge the event in the given delay
376+
* }
377+
* });
378+
*
379+
* @return a Promise that will be fulfilled when the client acknowledges the event
380+
*/
381+
public emitWithAck<Ev extends EventNames<EmitEvents>>(
382+
ev: Ev,
383+
...args: AllButLast<EventParams<EmitEvents, Ev>>
384+
): Promise<FirstArg<Last<EventParams<EmitEvents, Ev>>>> {
385+
// the timeout flag is optional
386+
const withErr = this.flags.timeout !== undefined;
387+
return new Promise((resolve, reject) => {
388+
args.push((arg1, arg2) => {
389+
if (withErr) {
390+
return arg1 ? reject(arg1) : resolve(arg2);
391+
} else {
392+
return resolve(arg1);
393+
}
394+
});
395+
this.emit(ev, ...(args as any[] as EventParams<EmitEvents, Ev>));
396+
});
397+
}
398+
360399
/**
361400
* @private
362401
*/

Diff for: lib/typed-events.ts

+14
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,20 @@ export abstract class StrictEventEmitter<
179179
}
180180
}
181181

182+
export type Last<T extends any[]> = T extends [...infer H, infer L] ? L : any;
183+
export type AllButLast<T extends any[]> = T extends [...infer H, infer L]
184+
? H
185+
: any[];
186+
export type FirstArg<T> = T extends (arg: infer Param) => infer Result
187+
? Param
188+
: any;
189+
export type SecondArg<T> = T extends (
190+
err: Error,
191+
arg: infer Param
192+
) => infer Result
193+
? Param
194+
: any;
195+
182196
type PrependTimeoutError<T extends any[]> = {
183197
[K in keyof T]: T[K] extends (...args: infer Params) => infer Result
184198
? (err: Error, ...args: Params) => Result

Diff for: test/messaging-many.ts

+68
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,74 @@ describe("messaging many", () => {
471471
});
472472
});
473473

474+
it("should broadcast and expect multiple acknowledgements (promise)", (done) => {
475+
const io = new Server(0);
476+
const socket1 = createClient(io, "/", { multiplex: false });
477+
const socket2 = createClient(io, "/", { multiplex: false });
478+
const socket3 = createClient(io, "/", { multiplex: false });
479+
480+
socket1.on("some event", (cb) => {
481+
cb(1);
482+
});
483+
484+
socket2.on("some event", (cb) => {
485+
cb(2);
486+
});
487+
488+
socket3.on("some event", (cb) => {
489+
cb(3);
490+
});
491+
492+
Promise.all([
493+
waitFor(socket1, "connect"),
494+
waitFor(socket2, "connect"),
495+
waitFor(socket3, "connect"),
496+
]).then(async () => {
497+
const responses = await io.timeout(2000).emitWithAck("some event");
498+
expect(responses).to.contain(1, 2, 3);
499+
500+
success(done, io, socket1, socket2, socket3);
501+
});
502+
});
503+
504+
it("should fail when a client does not acknowledge the event in the given delay (promise)", (done) => {
505+
const io = new Server(0);
506+
const socket1 = createClient(io, "/", { multiplex: false });
507+
const socket2 = createClient(io, "/", { multiplex: false });
508+
const socket3 = createClient(io, "/", { multiplex: false });
509+
510+
socket1.on("some event", (cb) => {
511+
cb(1);
512+
});
513+
514+
socket2.on("some event", (cb) => {
515+
cb(2);
516+
});
517+
518+
socket3.on("some event", () => {
519+
// timeout
520+
});
521+
522+
Promise.all([
523+
waitFor(socket1, "connect"),
524+
waitFor(socket2, "connect"),
525+
waitFor(socket3, "connect"),
526+
]).then(async () => {
527+
try {
528+
await io.timeout(200).emitWithAck("some event");
529+
expect.fail();
530+
} catch (err) {
531+
expect(err).to.be.an(Error);
532+
// @ts-ignore
533+
expect(err.responses).to.have.length(2);
534+
// @ts-ignore
535+
expect(err.responses).to.contain(1, 2);
536+
537+
success(done, io, socket1, socket2, socket3);
538+
}
539+
});
540+
});
541+
474542
it("should broadcast and return if the packet is sent to 0 client", (done) => {
475543
const io = new Server(0);
476544
const socket1 = createClient(io, "/", { multiplex: false });

0 commit comments

Comments
 (0)