Skip to content

Commit d3388bf

Browse files
feat: add two subscription modes for the sharded adapter
The subscriptionMode option allows to configure how many Redis Pub/Sub channels are used: - "static": 2 channels per namespace Useful when used with dynamic namespaces. - "dynamic": (2 + 1 per public room) channels per namespace The default value, useful when some rooms have a low number of clients (so only a few Socket.IO servers are notified). Related: - #491 - #492 - #493
1 parent ef51d69 commit d3388bf

File tree

4 files changed

+110
-29
lines changed

4 files changed

+110
-29
lines changed

lib/sharded-adapter.ts

Lines changed: 90 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,42 @@ const debug = debugModule("socket.io-redis");
88
const RETURN_BUFFERS = true;
99

1010
export interface ShardedRedisAdapterOptions {
11+
/**
12+
* The prefix for the Redis Pub/Sub channels.
13+
*
14+
* @default "socket.io"
15+
*/
1116
channelPrefix?: string;
17+
/**
18+
* The subscription mode impacts the number of Redis Pub/Sub channels:
19+
*
20+
* - "static": 2 channels per namespace
21+
*
22+
* Useful when used with dynamic namespaces.
23+
*
24+
* - "dynamic": (2 + 1 per public room) channels per namespace
25+
*
26+
* The default value, useful when some rooms have a low number of clients (so only a few Socket.IO servers are notified).
27+
*
28+
* Only public rooms (i.e. not related to a particular Socket ID) are taken in account, because:
29+
*
30+
* - a lot of connected clients would mean a lot of subscription/unsubscription
31+
* - the Socket ID attribute is ephemeral
32+
*
33+
* @default "dynamic"
34+
*/
35+
subscriptionMode?: "static" | "dynamic";
1236
}
1337

38+
/**
39+
* Create a new Adapter based on Redis sharded Pub/Sub introduced in Redis 7.0.
40+
*
41+
* @see https://redis.io/docs/manual/pubsub/#sharded-pubsub
42+
*
43+
* @param pubClient - the Redis client used to publish (from the `redis` package)
44+
* @param subClient - the Redis client used to subscribe (from the `redis` package)
45+
* @param opts - some additional options
46+
*/
1447
export function createShardedAdapter(
1548
pubClient: any,
1649
subClient: any,
@@ -36,6 +69,7 @@ class ShardedRedisAdapter extends ClusterAdapter {
3669
this.opts = Object.assign(
3770
{
3871
channelPrefix: "socket.io",
72+
subscriptionMode: "dynamic",
3973
},
4074
opts
4175
);
@@ -48,25 +82,69 @@ class ShardedRedisAdapter extends ClusterAdapter {
4882
this.subClient.sSubscribe(this.channel, handler, RETURN_BUFFERS);
4983
this.subClient.sSubscribe(this.responseChannel, handler, RETURN_BUFFERS);
5084

51-
this.cleanup = () => {
52-
return Promise.all([
53-
this.subClient.sUnsubscribe(this.channel, handler),
54-
this.subClient.sUnsubscribe(this.responseChannel, handler),
55-
]);
56-
};
85+
if (this.opts.subscriptionMode === "dynamic") {
86+
this.on("create-room", (room) => {
87+
const isPublicRoom = !this.sids.has(room);
88+
if (isPublicRoom) {
89+
this.subClient.sSubscribe(
90+
this.dynamicChannel(room),
91+
handler,
92+
RETURN_BUFFERS
93+
);
94+
}
95+
});
96+
97+
this.on("delete-room", (room) => {
98+
const isPublicRoom = !this.sids.has(room);
99+
if (isPublicRoom) {
100+
this.subClient.sUnsubscribe(this.dynamicChannel(room));
101+
}
102+
});
103+
}
57104
}
58105

59106
override close(): Promise<void> | void {
60-
this.cleanup();
107+
const channels = [this.channel, this.responseChannel];
108+
109+
if (this.opts.subscriptionMode === "dynamic") {
110+
this.rooms.forEach((_sids, room) => {
111+
const isPublicRoom = !this.sids.has(room);
112+
if (isPublicRoom) {
113+
channels.push(this.dynamicChannel(room));
114+
}
115+
});
116+
}
117+
118+
return this.subClient.sUnsubscribe(channels);
61119
}
62120

63121
override publishMessage(message) {
64-
debug("publishing message of type %s to %s", message.type, this.channel);
65-
this.pubClient.sPublish(this.channel, this.encode(message));
122+
const channel = this.computeChannel(message);
123+
debug("publishing message of type %s to %s", message.type, channel);
124+
this.pubClient.sPublish(channel, this.encode(message));
66125

67126
return Promise.resolve("");
68127
}
69128

129+
private computeChannel(message) {
130+
// broadcast with ack can not use a dynamic channel, because the serverCount() method return the number of all
131+
// servers, not only the ones where the given room exists
132+
const useDynamicChannel =
133+
this.opts.subscriptionMode === "dynamic" &&
134+
message.type === MessageType.BROADCAST &&
135+
message.data.requestId === undefined &&
136+
message.data.opts.rooms.length === 1;
137+
if (useDynamicChannel) {
138+
return this.dynamicChannel(message.data.opts.rooms[0]);
139+
} else {
140+
return this.channel;
141+
}
142+
}
143+
144+
private dynamicChannel(room) {
145+
return this.channel + room + "#";
146+
}
147+
70148
override publishResponse(requesterUid, response) {
71149
debug("publishing response of type %s to %s", response.type, requesterUid);
72150

@@ -104,10 +182,10 @@ class ShardedRedisAdapter extends ClusterAdapter {
104182
return debug("invalid format: %s", e.message);
105183
}
106184

107-
if (channel.toString() === this.channel) {
108-
this.onMessage(message, "");
109-
} else {
185+
if (channel.toString() === this.responseChannel) {
110186
this.onResponse(message);
187+
} else {
188+
this.onMessage(message);
111189
}
112190
}
113191

package-lock.json

Lines changed: 15 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
"mocha": "^10.1.0",
4141
"nyc": "^15.1.0",
4242
"prettier": "^2.8.7",
43-
"redis": "^4.6.5",
43+
"redis": "^4.6.6",
4444
"redis-v3": "npm:redis@^3.1.2",
4545
"socket.io": "^4.6.1",
4646
"socket.io-client": "^4.1.1",

test/specifics.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ describe("specifics", () => {
2323
afterEach(() => cleanup());
2424

2525
describe("broadcast", function () {
26-
it("broadcasts to a numeric room", (done) => {
26+
it("broadcasts to a numeric room", function (done) {
27+
if (process.env.SHARDED === "1") {
28+
return this.skip();
29+
}
2730
// @ts-ignore
2831
serverSockets[0].join(123);
2932

0 commit comments

Comments
 (0)