Skip to content

Commit 9f9be70

Browse files
toger5hughns
andauthored
MatrixRTC: New membership manager (#4726)
* WIP doodles on MembershipManager test cases * . * initial membership manager test setup. * Updates from discussion * revert renaming comments * remove unused import * fix leave delayed event resend test. It was missing a flush. * comment out and remove unused variables * es lint * use jsdom instead of node test environment * remove unused variables * remove unused export * temp * review * fixup tests * more review * remove wait for expect dependency * temp * fix wrong mocked meberhsip template * rename MembershipManager -> LegacyMembershipManager And remove the IMembershipManager from it * Add new memberhsip manager * fix tests to be compatible with old and new membership manager * Comment cleanup * Allow join to throw - Add tests for throwing cases - Fixs based on tests * introduce membershipExpiryTimeoutSlack * more detailed comments and cleanup * warn if slack is misconfigured and use default values instead * fix action resets. * flatten MembershipManager.spec.ts * rename testEnvironment to memberManagerTestEnvironment * allow configuring Legacy manager in the matrixRTC session * deprecate LegacyMembershipManager * remove usage of waitForExpect * flatten tests and add comments * clean up leave logic branch * add more leave test cases * use defer * review ("Some minor tidying things for now.") * add onError for join method and cleanup * use pop instead of filter * fixes * simplify error handling and MembershipAction Only use one membership action enum * Add diagram * fix new error api in rtc session * fix up retry counter * fix lints * make unrecoverable errors more explicit * fix tests * Allow multiple retries on the rtc state event http requests. * use then catch for startup * no try catch 1 * update expire headroom logic transition from try catch to .then .catch * replace flushPromise with advanceTimersByTimeAsync * fix leaving special cases * more unrecoverable errors special cases * move to MatrixRTCSessionManager logger * add state reset and add another unhandleable error The error occurs if we want to cancel the delayed event we still have an id for but get a non expected error. * missed review fixes * remove @jest/environment dependency * Cleanup awaits and Make mock types more correct. Make every mock return a Promise if the real implementation does return a pormise. * remove flush promise dependency * fix not recreating default state on reset This broke all tests since we only created the state once and than passed by ref * Use per action rate limit and retry counter There can be multiple retries at once so we need to store counters per action e.g. the send update membership and the restart delayed could be rate limited at the same time. * add linting to matrixrtc tests * Add fix async lints and use matrix rtc logger for test environment. * prettier * review step 1 * change to MatrixRTCSession logger * review step 2 * make LoopHandler Private * update config to use NewManager wording * emit error on rtc session if the membership manager encounters one * network error and throw refactor * make accessing the full room deprecated * remove deprecated usage of full room * Clean up the deprecation * add network error handler and cleanup * better logging, another test, make maximumNetworkErrorRetryCount configurable * more logging & refactor leave promise * add ConnectionError as possible retry cause * Make it work in embedded mode with a server that does not support delayed events * review iteration 1 * review iteration 2 * first step in improving widget error handling * make the embedded client throw ConnectionErrors where desired. * fix tests * delayed event sending widget mode stop gap fix. * improve comment * fix unrecoverable error joinState (and add JoinStateChanged) emission. * check that we do not add multipe sendFirstDelayed Events * also check insertions queue * always log "Missing own membership: force re-join" * Do not update the membership if we are in any (a later) state of sending our own state. The scheduled states MembershipActionType.SendFirstDelayedEvent and MembershipActionType.SendJoinEvent both imply that we are already trying to send our own membership state event. * make leave reset actually stop the manager. The reset case was not covered properly. There are cases where it is not allowed to add additional events after a reset and cases where we want to add more events after the reset. We need to allow this as a reset property. * fix tests (and implementation) * Allow MembershipManger to be set at runtime via JoinConfig.membershipManagerFactory * Map actions into status as a sanity check * Log status change after applying actions * Add todo * Cleanup * Log transition from earlier status * remove redundant status implementation also add TODO comment to not forget about this. * More cleanup * Consider insertions in status() * Log duration for emitting MatrixRTCSessionEvent.MembershipsChanged * add another valid condition for connected * some TODO cleanup * review add warning when using addAction while the scheduler is not running. * es lint * refactor to return based handler approach (remove insertions array) * refactor: Move action scheduler * refactor: move different handler cases into separate functions * linter * review: delayed events endpoint error * review * Suggestions from pair review * resetState is actually only used internally * Revert "resetState is actually only used internally" This reverts commit 6af4730. * refactor: running is part of the scheduler (not state) * refactor: move everything state related from schduler to manager. * review * Update src/matrixrtc/NewMembershipManager.ts Co-authored-by: Hugh Nimmo-Smith <[email protected]> * review * public -> private + missed review fiexes (comment typos) --------- Co-authored-by: Hugh Nimmo-Smith <[email protected]> Co-authored-by: Hugh Nimmo-Smith <[email protected]>
1 parent f552370 commit 9f9be70

11 files changed

+1373
-112
lines changed

Diff for: spec/unit/embedded.spec.ts

+20-20
Original file line numberDiff line numberDiff line change
@@ -49,26 +49,26 @@ const testOIDCToken = {
4949
token_type: "Bearer",
5050
};
5151
class MockWidgetApi extends EventEmitter {
52-
public start = jest.fn();
53-
public requestCapability = jest.fn();
54-
public requestCapabilities = jest.fn();
55-
public requestCapabilityForRoomTimeline = jest.fn();
56-
public requestCapabilityToSendEvent = jest.fn();
57-
public requestCapabilityToReceiveEvent = jest.fn();
58-
public requestCapabilityToSendMessage = jest.fn();
59-
public requestCapabilityToReceiveMessage = jest.fn();
60-
public requestCapabilityToSendState = jest.fn();
61-
public requestCapabilityToReceiveState = jest.fn();
62-
public requestCapabilityToSendToDevice = jest.fn();
63-
public requestCapabilityToReceiveToDevice = jest.fn();
52+
public start = jest.fn().mockResolvedValue(undefined);
53+
public requestCapability = jest.fn().mockResolvedValue(undefined);
54+
public requestCapabilities = jest.fn().mockResolvedValue(undefined);
55+
public requestCapabilityForRoomTimeline = jest.fn().mockResolvedValue(undefined);
56+
public requestCapabilityToSendEvent = jest.fn().mockResolvedValue(undefined);
57+
public requestCapabilityToReceiveEvent = jest.fn().mockResolvedValue(undefined);
58+
public requestCapabilityToSendMessage = jest.fn().mockResolvedValue(undefined);
59+
public requestCapabilityToReceiveMessage = jest.fn().mockResolvedValue(undefined);
60+
public requestCapabilityToSendState = jest.fn().mockResolvedValue(undefined);
61+
public requestCapabilityToReceiveState = jest.fn().mockResolvedValue(undefined);
62+
public requestCapabilityToSendToDevice = jest.fn().mockResolvedValue(undefined);
63+
public requestCapabilityToReceiveToDevice = jest.fn().mockResolvedValue(undefined);
6464
public sendRoomEvent = jest.fn(
65-
(eventType: string, content: unknown, roomId?: string, delay?: number, parentDelayId?: string) =>
65+
async (eventType: string, content: unknown, roomId?: string, delay?: number, parentDelayId?: string) =>
6666
delay === undefined && parentDelayId === undefined
6767
? { event_id: `$${Math.random()}` }
6868
: { delay_id: `id-${Math.random()}` },
6969
);
7070
public sendStateEvent = jest.fn(
71-
(
71+
async (
7272
eventType: string,
7373
stateKey: string,
7474
content: unknown,
@@ -80,17 +80,17 @@ class MockWidgetApi extends EventEmitter {
8080
? { event_id: `$${Math.random()}` }
8181
: { delay_id: `id-${Math.random()}` },
8282
);
83-
public updateDelayedEvent = jest.fn();
84-
public sendToDevice = jest.fn();
85-
public requestOpenIDConnectToken = jest.fn(() => {
83+
public updateDelayedEvent = jest.fn().mockResolvedValue(undefined);
84+
public sendToDevice = jest.fn().mockResolvedValue(undefined);
85+
public requestOpenIDConnectToken = jest.fn(async () => {
8686
return testOIDCToken;
8787
return new Promise<IOpenIDCredentials>(() => {
8888
return testOIDCToken;
8989
});
9090
});
91-
public readStateEvents = jest.fn(() => []);
92-
public getTurnServers = jest.fn(() => []);
93-
public sendContentLoaded = jest.fn();
91+
public readStateEvents = jest.fn(async () => []);
92+
public getTurnServers = jest.fn(async () => []);
93+
public sendContentLoaded = jest.fn().mockResolvedValue(undefined);
9494

9595
public transport = {
9696
reply: jest.fn(),

Diff for: spec/unit/matrixrtc/MembershipManager.spec.ts

+110-14
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ limitations under the License.
1919

2020
import { type MockedFunction, type Mock } from "jest-mock";
2121

22-
import { EventType, HTTPError, MatrixError, type Room } from "../../../src";
22+
import { EventType, HTTPError, MatrixError, UnsupportedDelayedEventsEndpointError, type Room } from "../../../src";
2323
import { type Focus, type LivekitFocusActive, type SessionMembershipData } from "../../../src/matrixrtc";
24-
import { LegacyMembershipManager } from "../../../src/matrixrtc/MembershipManager";
24+
import { LegacyMembershipManager } from "../../../src/matrixrtc/LegacyMembershipManager";
2525
import { makeMockClient, makeMockRoom, membershipTemplate, mockCallMembership, type MockClient } from "./mocks";
26+
import { MembershipManager } from "../../../src/matrixrtc/NewMembershipManager";
2627
import { defer } from "../../../src/utils";
2728

2829
function waitForMockCall(method: MockedFunction<any>, returnVal?: Promise<any>) {
@@ -44,9 +45,10 @@ function createAsyncHandle(method: MockedFunction<any>) {
4445
* Tests different MembershipManager implementations. Some tests don't apply to `LegacyMembershipManager`
4546
* use !FailsForLegacy to skip those. See: testEnvironment for more details.
4647
*/
48+
4749
describe.each([
4850
{ TestMembershipManager: LegacyMembershipManager, description: "LegacyMembershipManager" },
49-
// { TestMembershipManager: MembershipManager, description: "MembershipManager" },
51+
{ TestMembershipManager: MembershipManager, description: "MembershipManager" },
5052
])("$description", ({ TestMembershipManager }) => {
5153
let client: MockClient;
5254
let room: Room;
@@ -244,7 +246,12 @@ describe.each([
244246
const delayedHandle = createAsyncHandle(client._unstable_sendDelayedStateEvent as Mock);
245247
const manager = new TestMembershipManager({}, room, client, () => undefined);
246248
manager.join([focus], focusActive);
247-
delayedHandle.reject?.(Error("Server does not support the delayed events API"));
249+
delayedHandle.reject?.(
250+
new UnsupportedDelayedEventsEndpointError(
251+
"Server does not support the delayed events API",
252+
"sendDelayedStateEvent",
253+
),
254+
);
248255
expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1);
249256
});
250257
it("does try to schedule a delayed leave event again if rate limited", async () => {
@@ -328,6 +335,7 @@ describe.each([
328335
await jest.advanceTimersByTimeAsync(1);
329336
(client._unstable_updateDelayedEvent as Mock<any>).mockRejectedValue("unknown");
330337
await manager.leave();
338+
331339
// We send a normal leave event since we failed using updateDelayedEvent with the "send" action.
332340
expect(client.sendStateEvent).toHaveBeenLastCalledWith(
333341
room.roomId,
@@ -337,9 +345,9 @@ describe.each([
337345
);
338346
});
339347
// FailsForLegacy because legacy implementation always sends the empty state event even though it isn't needed
340-
it("does nothing if not joined !FailsForLegacy", async () => {
348+
it("does nothing if not joined !FailsForLegacy", () => {
341349
const manager = new TestMembershipManager({}, room, client, () => undefined);
342-
await manager.leave();
350+
expect(async () => await manager.leave()).not.toThrow();
343351
expect(client._unstable_sendDelayedStateEvent).not.toHaveBeenCalled();
344352
expect(client.sendStateEvent).not.toHaveBeenCalled();
345353
});
@@ -470,10 +478,10 @@ describe.each([
470478
// !FailsForLegacy because the expires logic was removed for the legacy call manager.
471479
// Delayed events should replace it entirely but before they have wide adoption
472480
// the expiration logic still makes sense.
473-
// TODO: add git commit when we removed it.
474-
it("extends `expires` when call still active !FailsForLegacy", async () => {
481+
// TODO: Add git commit when we removed it.
482+
async function testExpires(expire: number, headroom?: number) {
475483
const manager = new TestMembershipManager(
476-
{ membershipExpiryTimeout: 10_000 },
484+
{ membershipExpiryTimeout: expire, membershipExpiryTimeoutHeadroom: headroom },
477485
room,
478486
client,
479487
() => undefined,
@@ -482,13 +490,19 @@ describe.each([
482490
await waitForMockCall(client.sendStateEvent);
483491
expect(client.sendStateEvent).toHaveBeenCalledTimes(1);
484492
const sentMembership = (client.sendStateEvent as Mock).mock.calls[0][2] as SessionMembershipData;
485-
expect(sentMembership.expires).toBe(10_000);
493+
expect(sentMembership.expires).toBe(expire);
486494
for (let i = 2; i <= 12; i++) {
487-
await jest.advanceTimersByTimeAsync(10_000);
495+
await jest.advanceTimersByTimeAsync(expire);
488496
expect(client.sendStateEvent).toHaveBeenCalledTimes(i);
489497
const sentMembership = (client.sendStateEvent as Mock).mock.lastCall![2] as SessionMembershipData;
490-
expect(sentMembership.expires).toBe(10_000 * i);
498+
expect(sentMembership.expires).toBe(expire * i);
491499
}
500+
}
501+
it("extends `expires` when call still active !FailsForLegacy", async () => {
502+
await testExpires(10_000);
503+
});
504+
it("extends `expires` using headroom configuration !FailsForLegacy", async () => {
505+
await testExpires(10_000, 1_000);
492506
});
493507
});
494508

@@ -544,7 +558,7 @@ describe.each([
544558
expect(client.sendStateEvent).toHaveBeenCalledTimes(1);
545559
});
546560
// FailsForLegacy as implementation does not re-check membership before retrying.
547-
it("abandons retry loop if leave() was called !FailsForLegacy", async () => {
561+
it("abandons retry loop if leave() was called before sending state event !FailsForLegacy", async () => {
548562
const handle = createAsyncHandle(client._unstable_sendDelayedStateEvent);
549563

550564
const manager = new TestMembershipManager({}, room, client, () => undefined);
@@ -565,7 +579,6 @@ describe.each([
565579
await manager.leave();
566580

567581
// Wait for all timers to be setup
568-
// await flushPromises();
569582
await jest.advanceTimersByTimeAsync(1000);
570583

571584
// No new events should have been sent:
@@ -603,4 +616,87 @@ describe.each([
603616
});
604617
});
605618
});
619+
describe("unrecoverable errors", () => {
620+
// !FailsForLegacy because legacy does not have a retry limit and no mechanism to communicate unrecoverable errors.
621+
it("throws, when reaching maximum number of retries for initial delayed event creation !FailsForLegacy", async () => {
622+
const delayEventSendError = jest.fn();
623+
(client._unstable_sendDelayedStateEvent as Mock<any>).mockRejectedValue(
624+
new MatrixError(
625+
{ errcode: "M_LIMIT_EXCEEDED" },
626+
429,
627+
undefined,
628+
undefined,
629+
new Headers({ "Retry-After": "2" }),
630+
),
631+
);
632+
const manager = new TestMembershipManager({}, room, client, () => undefined);
633+
manager.join([focus], focusActive, delayEventSendError);
634+
635+
for (let i = 0; i < 10; i++) {
636+
await jest.advanceTimersByTimeAsync(2000);
637+
}
638+
expect(delayEventSendError).toHaveBeenCalled();
639+
});
640+
// !FailsForLegacy because legacy does not have a retry limit and no mechanism to communicate unrecoverable errors.
641+
it("throws, when reaching maximum number of retries !FailsForLegacy", async () => {
642+
const delayEventRestartError = jest.fn();
643+
(client._unstable_updateDelayedEvent as Mock<any>).mockRejectedValue(
644+
new MatrixError(
645+
{ errcode: "M_LIMIT_EXCEEDED" },
646+
429,
647+
undefined,
648+
undefined,
649+
new Headers({ "Retry-After": "1" }),
650+
),
651+
);
652+
const manager = new TestMembershipManager({}, room, client, () => undefined);
653+
manager.join([focus], focusActive, delayEventRestartError);
654+
655+
for (let i = 0; i < 10; i++) {
656+
await jest.advanceTimersByTimeAsync(1000);
657+
}
658+
expect(delayEventRestartError).toHaveBeenCalled();
659+
});
660+
it("falls back to using pure state events when some error occurs while sending delayed events !FailsForLegacy", async () => {
661+
const unrecoverableError = jest.fn();
662+
(client._unstable_sendDelayedStateEvent as Mock<any>).mockRejectedValue(new HTTPError("unknown", 601));
663+
const manager = new TestMembershipManager({}, room, client, () => undefined);
664+
manager.join([focus], focusActive, unrecoverableError);
665+
await waitForMockCall(client.sendStateEvent);
666+
expect(unrecoverableError).not.toHaveBeenCalledWith();
667+
expect(client.sendStateEvent).toHaveBeenCalled();
668+
});
669+
it("retries before failing in case its a network error !FailsForLegacy", async () => {
670+
const unrecoverableError = jest.fn();
671+
(client._unstable_sendDelayedStateEvent as Mock<any>).mockRejectedValue(new HTTPError("unknown", 501));
672+
const manager = new TestMembershipManager(
673+
{ callMemberEventRetryDelayMinimum: 1000, maximumNetworkErrorRetryCount: 7 },
674+
room,
675+
client,
676+
() => undefined,
677+
);
678+
manager.join([focus], focusActive, unrecoverableError);
679+
for (let retries = 0; retries < 7; retries++) {
680+
expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(retries + 1);
681+
await jest.advanceTimersByTimeAsync(1000);
682+
}
683+
expect(unrecoverableError).toHaveBeenCalled();
684+
expect(unrecoverableError.mock.lastCall![0].message).toMatch(
685+
"The MembershipManager shut down because of the end condition",
686+
);
687+
expect(client.sendStateEvent).not.toHaveBeenCalled();
688+
});
689+
it("falls back to using pure state events when UnsupportedDelayedEventsEndpointError encountered for delayed events !FailsForLegacy", async () => {
690+
const unrecoverableError = jest.fn();
691+
(client._unstable_sendDelayedStateEvent as Mock<any>).mockRejectedValue(
692+
new UnsupportedDelayedEventsEndpointError("not supported", "sendDelayedStateEvent"),
693+
);
694+
const manager = new TestMembershipManager({}, room, client, () => undefined);
695+
manager.join([focus], focusActive, unrecoverableError);
696+
await jest.advanceTimersByTimeAsync(1);
697+
698+
expect(unrecoverableError).not.toHaveBeenCalled();
699+
expect(client.sendStateEvent).toHaveBeenCalled();
700+
});
701+
});
606702
});

Diff for: spec/unit/matrixrtc/mocks.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ export const membershipTemplate: SessionMembershipData = {
2525
call_id: "",
2626
device_id: "AAAAAAA",
2727
scope: "m.room",
28-
focus_active: { type: "livekit", livekit_service_url: "https://lk.url" },
28+
focus_active: { type: "livekit", focus_selection: "oldest_membership" },
2929
foci_preferred: [
3030
{
3131
livekit_alias: "!alias:something.org",

Diff for: src/client.ts

+17-4
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ import {
240240
validateAuthMetadataAndKeys,
241241
} from "./oidc/index.ts";
242242
import { type EmptyObject } from "./@types/common.ts";
243+
import { UnsupportedDelayedEventsEndpointError } from "./errors.ts";
243244

244245
export type Store = IStore;
245246

@@ -3351,7 +3352,10 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
33513352
txnId?: string,
33523353
): Promise<SendDelayedEventResponse> {
33533354
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
3354-
throw Error("Server does not support the delayed events API");
3355+
throw new UnsupportedDelayedEventsEndpointError(
3356+
"Server does not support the delayed events API",
3357+
"sendDelayedEvent",
3358+
);
33553359
}
33563360

33573361
this.addThreadRelationIfNeeded(content, threadId, roomId);
@@ -3374,7 +3378,10 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
33743378
opts: IRequestOpts = {},
33753379
): Promise<SendDelayedEventResponse> {
33763380
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
3377-
throw Error("Server does not support the delayed events API");
3381+
throw new UnsupportedDelayedEventsEndpointError(
3382+
"Server does not support the delayed events API",
3383+
"sendDelayedStateEvent",
3384+
);
33783385
}
33793386

33803387
const pathParams = {
@@ -3398,7 +3405,10 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
33983405
// eslint-disable-next-line
33993406
public async _unstable_getDelayedEvents(fromToken?: string): Promise<DelayedEventInfo> {
34003407
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
3401-
throw Error("Server does not support the delayed events API");
3408+
throw new UnsupportedDelayedEventsEndpointError(
3409+
"Server does not support the delayed events API",
3410+
"getDelayedEvents",
3411+
);
34023412
}
34033413

34043414
const queryDict = fromToken ? { from: fromToken } : undefined;
@@ -3420,7 +3430,10 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
34203430
requestOptions: IRequestOpts = {},
34213431
): Promise<EmptyObject> {
34223432
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
3423-
throw Error("Server does not support the delayed events API");
3433+
throw new UnsupportedDelayedEventsEndpointError(
3434+
"Server does not support the delayed events API",
3435+
"updateDelayedEvent",
3436+
);
34243437
}
34253438

34263439
const path = utils.encodeUri("/delayed_events/$delayId", {

0 commit comments

Comments
 (0)