Skip to content

Commit 5e3c66f

Browse files
justjannegithub-actions[bot]
authored andcommitted
Correctly handle limited sync responses by resetting the thread timeline (#3056)
* Reset thread livetimelines when desynced * Implement workaround for matrix-org/synapse#14830 (cherry picked from commit a34d06c)
1 parent 94f1eda commit 5e3c66f

File tree

3 files changed

+202
-3
lines changed

3 files changed

+202
-3
lines changed

spec/unit/models/thread.spec.ts

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import { Thread, THREAD_RELATION_TYPE, ThreadEvent } from "../../../src/models/t
2020
import { mkThread } from "../../test-utils/thread";
2121
import { TestClient } from "../../TestClient";
2222
import { emitPromise, mkMessage, mock } from "../../test-utils/test-utils";
23-
import { EventStatus, MatrixEvent } from "../../../src";
23+
import { Direction, EventStatus, MatrixEvent } from "../../../src";
2424
import { ReceiptType } from "../../../src/@types/read_receipts";
2525
import { getMockClientWithEventEmitter, mockClientMethodsUser } from "../../test-utils/client";
2626
import { ReEmitter } from "../../../src/ReEmitter";
@@ -283,4 +283,143 @@ describe("Thread", () => {
283283
expect(thread2.getEventReadUpTo(myUserId)).toBe(null);
284284
});
285285
});
286+
287+
describe("resetLiveTimeline", () => {
288+
// ResetLiveTimeline is used when we have missing messages between the current live timeline's end and newly
289+
// received messages. In that case, we want to replace the existing live timeline. To ensure pagination
290+
// continues working correctly, new pagination tokens need to be set on both the old live timeline (which is
291+
// now a regular timeline) and the new live timeline.
292+
it("replaces the live timeline and correctly sets pagination tokens", async () => {
293+
const myUserId = "@bob:example.org";
294+
const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, {
295+
timelineSupport: false,
296+
});
297+
const client = testClient.client;
298+
const room = new Room("123", client, myUserId, {
299+
pendingEventOrdering: PendingEventOrdering.Detached,
300+
});
301+
302+
jest.spyOn(client, "getRoom").mockReturnValue(room);
303+
304+
const { thread } = mkThread({
305+
room,
306+
client,
307+
authorId: myUserId,
308+
participantUserIds: ["@alice:example.org"],
309+
length: 3,
310+
});
311+
await emitPromise(thread, ThreadEvent.Update);
312+
expect(thread.length).toBe(2);
313+
314+
jest.spyOn(client, "createMessagesRequest").mockImplementation((_, token) =>
315+
Promise.resolve({
316+
chunk: [],
317+
start: `${token}-new`,
318+
end: `${token}-new`,
319+
}),
320+
);
321+
322+
function timelines(): [string | null, string | null][] {
323+
return thread.timelineSet
324+
.getTimelines()
325+
.map((it) => [it.getPaginationToken(Direction.Backward), it.getPaginationToken(Direction.Forward)]);
326+
}
327+
328+
expect(timelines()).toEqual([[null, null]]);
329+
const promise = thread.resetLiveTimeline("b1", "f1");
330+
expect(timelines()).toEqual([
331+
[null, "f1"],
332+
["b1", null],
333+
]);
334+
await promise;
335+
expect(timelines()).toEqual([
336+
[null, "f1-new"],
337+
["b1-new", null],
338+
]);
339+
});
340+
341+
// As the pagination tokens cannot be used right now, resetLiveTimeline needs to replace them before they can
342+
// be used. But if in the future the bug in synapse is fixed, and they can actually be used, we can get into a
343+
// state where the client has paginated (and changed the tokens) while resetLiveTimeline tries to set the
344+
// corrected tokens. To prevent such a race condition, we make sure that resetLiveTimeline respects any
345+
// changes done to the pagination tokens.
346+
it("replaces the live timeline but does not replace changed pagination tokens", async () => {
347+
const myUserId = "@bob:example.org";
348+
const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, {
349+
timelineSupport: false,
350+
});
351+
const client = testClient.client;
352+
const room = new Room("123", client, myUserId, {
353+
pendingEventOrdering: PendingEventOrdering.Detached,
354+
});
355+
356+
jest.spyOn(client, "getRoom").mockReturnValue(room);
357+
358+
const { thread } = mkThread({
359+
room,
360+
client,
361+
authorId: myUserId,
362+
participantUserIds: ["@alice:example.org"],
363+
length: 3,
364+
});
365+
await emitPromise(thread, ThreadEvent.Update);
366+
expect(thread.length).toBe(2);
367+
368+
jest.spyOn(client, "createMessagesRequest").mockImplementation((_, token) =>
369+
Promise.resolve({
370+
chunk: [],
371+
start: `${token}-new`,
372+
end: `${token}-new`,
373+
}),
374+
);
375+
376+
function timelines(): [string | null, string | null][] {
377+
return thread.timelineSet
378+
.getTimelines()
379+
.map((it) => [it.getPaginationToken(Direction.Backward), it.getPaginationToken(Direction.Forward)]);
380+
}
381+
382+
expect(timelines()).toEqual([[null, null]]);
383+
const promise = thread.resetLiveTimeline("b1", "f1");
384+
expect(timelines()).toEqual([
385+
[null, "f1"],
386+
["b1", null],
387+
]);
388+
thread.timelineSet.getTimelines()[0].setPaginationToken("f2", Direction.Forward);
389+
thread.timelineSet.getTimelines()[1].setPaginationToken("b2", Direction.Backward);
390+
await promise;
391+
expect(timelines()).toEqual([
392+
[null, "f2"],
393+
["b2", null],
394+
]);
395+
});
396+
397+
it("is correctly called by the room", async () => {
398+
const myUserId = "@bob:example.org";
399+
const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, {
400+
timelineSupport: false,
401+
});
402+
const client = testClient.client;
403+
const room = new Room("123", client, myUserId, {
404+
pendingEventOrdering: PendingEventOrdering.Detached,
405+
});
406+
407+
jest.spyOn(client, "getRoom").mockReturnValue(room);
408+
409+
const { thread } = mkThread({
410+
room,
411+
client,
412+
authorId: myUserId,
413+
participantUserIds: ["@alice:example.org"],
414+
length: 3,
415+
});
416+
await emitPromise(thread, ThreadEvent.Update);
417+
expect(thread.length).toBe(2);
418+
const mock = jest.spyOn(thread, "resetLiveTimeline");
419+
mock.mockReturnValue(Promise.resolve());
420+
421+
room.resetLiveTimeline("b1", "f1");
422+
expect(mock).toHaveBeenCalledWith("b1", "f1");
423+
});
424+
});
286425
});

src/models/room.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1114,6 +1114,9 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
11141114
for (const timelineSet of this.timelineSets) {
11151115
timelineSet.resetLiveTimeline(backPaginationToken ?? undefined, forwardPaginationToken ?? undefined);
11161116
}
1117+
for (const thread of this.threads.values()) {
1118+
thread.resetLiveTimeline(backPaginationToken, forwardPaginationToken);
1119+
}
11171120

11181121
this.fixUpLegacyTimelineFields();
11191122
}
@@ -1223,7 +1226,7 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
12231226
const event = this.findEventById(eventId);
12241227
const thread = this.findThreadForEvent(event);
12251228
if (thread) {
1226-
return thread.timelineSet.getLiveTimeline();
1229+
return thread.timelineSet.getTimelineForEvent(eventId);
12271230
} else {
12281231
return this.getUnfilteredTimelineSet().getTimelineForEvent(eventId);
12291232
}

src/models/thread.ts

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ export class Thread extends ReadReceipt<EmittedEvents, EventHandlerMap> {
256256
this.setEventMetadata(event);
257257

258258
const lastReply = this.lastReply();
259-
const isNewestReply = !lastReply || event.localTimestamp > lastReply!.localTimestamp;
259+
const isNewestReply = !lastReply || event.localTimestamp >= lastReply!.localTimestamp;
260260

261261
// Add all incoming events to the thread's timeline set when there's no server support
262262
if (!Thread.hasServerSideSupport) {
@@ -358,6 +358,63 @@ export class Thread extends ReadReceipt<EmittedEvents, EventHandlerMap> {
358358
this.pendingReplyCount = pendingEvents.length;
359359
}
360360

361+
/**
362+
* Reset the live timeline of all timelineSets, and start new ones.
363+
*
364+
* <p>This is used when /sync returns a 'limited' timeline. 'Limited' means that there's a gap between the messages
365+
* /sync returned, and the last known message in our timeline. In such a case, our live timeline isn't live anymore
366+
* and has to be replaced by a new one. To make sure we can continue paginating our timelines correctly, we have to
367+
* set new pagination tokens on the old and the new timeline.
368+
*
369+
* @param backPaginationToken - token for back-paginating the new timeline
370+
* @param forwardPaginationToken - token for forward-paginating the old live timeline,
371+
* if absent or null, all timelines are reset, removing old ones (including the previous live
372+
* timeline which would otherwise be unable to paginate forwards without this token).
373+
* Removing just the old live timeline whilst preserving previous ones is not supported.
374+
*/
375+
public async resetLiveTimeline(
376+
backPaginationToken?: string | null,
377+
forwardPaginationToken?: string | null,
378+
): Promise<void> {
379+
const oldLive = this.liveTimeline;
380+
this.timelineSet.resetLiveTimeline(backPaginationToken ?? undefined, forwardPaginationToken ?? undefined);
381+
const newLive = this.liveTimeline;
382+
383+
// FIXME: Remove the following as soon as https://github.com/matrix-org/synapse/issues/14830 is resolved.
384+
//
385+
// The pagination API for thread timelines currently can't handle the type of pagination tokens returned by sync
386+
//
387+
// To make this work anyway, we'll have to transform them into one of the types that the API can handle.
388+
// One option is passing the tokens to /messages, which can handle sync tokens, and returns the right format.
389+
// /messages does not return new tokens on requests with a limit of 0.
390+
// This means our timelines might overlap a slight bit, but that's not an issue, as we deduplicate messages
391+
// anyway.
392+
393+
let newBackward: string | undefined;
394+
let oldForward: string | undefined;
395+
if (backPaginationToken) {
396+
const res = await this.client.createMessagesRequest(this.roomId, backPaginationToken, 1, Direction.Forward);
397+
newBackward = res.end;
398+
}
399+
if (forwardPaginationToken) {
400+
const res = await this.client.createMessagesRequest(
401+
this.roomId,
402+
forwardPaginationToken,
403+
1,
404+
Direction.Backward,
405+
);
406+
oldForward = res.start;
407+
}
408+
// Only replace the token if we don't have paginated away from this position already. This situation doesn't
409+
// occur today, but if the above issue is resolved, we'd have to go down this path.
410+
if (forwardPaginationToken && oldLive.getPaginationToken(Direction.Forward) === forwardPaginationToken) {
411+
oldLive.setPaginationToken(oldForward ?? null, Direction.Forward);
412+
}
413+
if (backPaginationToken && newLive.getPaginationToken(Direction.Backward) === backPaginationToken) {
414+
newLive.setPaginationToken(newBackward ?? null, Direction.Backward);
415+
}
416+
}
417+
361418
private async updateThreadMetadata(): Promise<void> {
362419
this.updatePendingReplyCount();
363420

0 commit comments

Comments
 (0)