Skip to content

Read receipts for threads proof of concept (MSC3771) #2559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
5 changes: 3 additions & 2 deletions spec/unit/room.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ import {
RoomEvent,
} from "../../src";
import { EventTimeline } from "../../src/models/event-timeline";
import { IWrappedReceipt, Room } from "../../src/models/room";
import { Room } from "../../src/models/room";
import { RoomState } from "../../src/models/room-state";
import { UNSTABLE_ELEMENT_FUNCTIONAL_USERS } from "../../src/@types/event";
import { TestClient } from "../TestClient";
import { emitPromise } from "../test-utils/test-utils";
import { ReceiptType } from "../../src/@types/read_receipts";
import { Thread, ThreadEvent } from "../../src/models/thread";
import { WrappedReceipt } from "../../src/models/timeline-receipts";

describe("Room", function() {
const roomId = "!foo:bar";
Expand Down Expand Up @@ -2429,7 +2430,7 @@ describe("Room", function() {

it("handles missing receipt type", () => {
room.getReadReceiptForUserId = (userId, ignore, receiptType) => {
return receiptType === ReceiptType.ReadPrivate ? { eventId: "eventId" } as IWrappedReceipt : null;
return receiptType === ReceiptType.ReadPrivate ? { eventId: "eventId" } as WrappedReceipt : null;
};

expect(room.getEventReadUpTo(userA)).toEqual("eventId");
Expand Down
7 changes: 7 additions & 0 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4589,6 +4589,12 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
$receiptType: receiptType,
$eventId: event.getId(),
});

const isThread = !!event.threadRootId;
if (isThread) {
body.thread_id = event.threadRootId;
}

const promise = this.http.authedRequest(callback, Method.Post, path, undefined, body || {});

const room = this.getRoom(event.getRoomId());
Expand All @@ -4607,6 +4613,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
* @return {module:http-api.MatrixError} Rejects: with an error response.
*/
public async sendReadReceipt(event: MatrixEvent, receiptType = ReceiptType.Read, callback?: Callback): Promise<{}> {
if (!event) return;
const eventId = event.getId();
const room = this.getRoom(event.getRoomId());
if (room && room.hasPendingEvent(eventId)) {
Expand Down
4 changes: 2 additions & 2 deletions src/http-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ export class MatrixHttpApi {
};

// set an initial timeout of 30s; we'll advance it each time we get a progress notification
let timeoutTimer = callbacks.setTimeout(timeoutFn, 30000);
let timeoutTimer = callbacks.setTimeout(timeoutFn, 60000);

xhr.onreadystatechange = function() {
let resp: string;
Expand Down Expand Up @@ -421,7 +421,7 @@ export class MatrixHttpApi {
callbacks.clearTimeout(timeoutTimer);
upload.loaded = ev.loaded;
upload.total = ev.total;
timeoutTimer = callbacks.setTimeout(timeoutFn, 30000);
timeoutTimer = callbacks.setTimeout(timeoutFn, 60000);
if (opts.progressHandler) {
opts.progressHandler({
loaded: ev.loaded,
Expand Down
2 changes: 1 addition & 1 deletion src/models/room-member.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ export class RoomMember extends TypedEventEmitter<RoomMemberEvent, RoomMemberEve
* @fires module:client~MatrixClient#event:"RoomMember.typing"
*/
public setTypingEvent(event: MatrixEvent): void {
if (event.getType() !== "m.typing") {
if (event.getType() !== EventType.Typing) {
return;
}
const oldTyping = this.typing;
Expand Down
192 changes: 20 additions & 172 deletions src/models/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ import {
FILTER_RELATED_BY_SENDERS,
ThreadFilterType,
} from "./thread";
import { TypedEventEmitter } from "./typed-event-emitter";
import { ReceiptType } from "../@types/read_receipts";
import { IStateEventWithRoomId } from "../@types/search";
import { RelationsContainer } from "./relations-container";
import { ReceiptContent, synthesizeReceipt, TimelineReceipts } from "./timeline-receipts";

// These constants are used as sane defaults when the homeserver doesn't support
// the m.room_versions capability. In practice, KNOWN_SAFE_ROOM_VERSION should be
Expand All @@ -60,23 +60,6 @@ import { RelationsContainer } from "./relations-container";
export const KNOWN_SAFE_ROOM_VERSION = '9';
const SAFE_ROOM_VERSIONS = ['1', '2', '3', '4', '5', '6', '7', '8', '9'];

function synthesizeReceipt(userId: string, event: MatrixEvent, receiptType: ReceiptType): MatrixEvent {
// console.log("synthesizing receipt for "+event.getId());
return new MatrixEvent({
content: {
[event.getId()]: {
[receiptType]: {
[userId]: {
ts: event.getTs(),
},
},
},
},
type: EventType.Receipt,
room_id: event.getRoomId(),
});
}

interface IOpts {
storageToken?: string;
pendingEventOrdering?: PendingEventOrdering;
Expand All @@ -90,40 +73,6 @@ export interface IRecommendedVersion {
urgent: boolean;
}

interface IReceipt {
ts: number;
}

export interface IWrappedReceipt {
eventId: string;
data: IReceipt;
}

interface ICachedReceipt {
type: ReceiptType;
userId: string;
data: IReceipt;
}

type ReceiptCache = {[eventId: string]: ICachedReceipt[]};

interface IReceiptContent {
[eventId: string]: {
[key in ReceiptType]: {
[userId: string]: IReceipt;
};
};
}

const ReceiptPairRealIndex = 0;
const ReceiptPairSyntheticIndex = 1;
// We will only hold a synthetic receipt if we do not have a real receipt or the synthetic is newer.
type Receipts = {
[receiptType: string]: {
[userId: string]: [IWrappedReceipt, IWrappedReceipt]; // Pair<real receipt, synthetic receipt> (both nullable)
};
};

// When inserting a visibility event affecting event `eventId`, we
// need to scan through existing visibility events for `eventId`.
// In theory, this could take an unlimited amount of time if:
Expand Down Expand Up @@ -207,15 +156,9 @@ export type RoomEventHandlerMap = {
[ThreadEvent.New]: (thread: Thread, toStartOfTimeline: boolean) => void;
} & ThreadHandlerMap & MatrixEventHandlerMap;

export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap> {
export class Room extends TimelineReceipts<EmittedEvents, RoomEventHandlerMap> {
public readonly reEmitter: TypedReEmitter<EmittedEvents, RoomEventHandlerMap>;
private txnToEvent: Record<string, MatrixEvent> = {}; // Pending in-flight requests { string: MatrixEvent }
// receipts should clobber based on receipt_type and user_id pairs hence
// the form of this structure. This is sub-optimal for the exposed APIs
// which pass in an event ID and get back some receipts, so we also store
// a pre-cached list for this purpose.
private receipts: Receipts = {}; // { receipt_type: { user_id: IReceipt } }
private receiptCacheByEventId: ReceiptCache = {}; // { event_id: ICachedReceipt[] }
private notificationCounts: Partial<Record<NotificationCountType, number>> = {};
private readonly timelineSets: EventTimelineSet[];
public readonly threadsTimelineSets: EventTimelineSet[] = [];
Expand Down Expand Up @@ -2592,7 +2535,7 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>

let latest = privateReadReceipt;
[unstablePrivateReadReceipt, publicReadReceipt].forEach((receipt) => {
if (receipt?.data?.ts > latest?.data?.ts || !latest) {
if (receipt?.data?.ts > latest?.data?.ts) {
latest = receipt;
}
});
Expand Down Expand Up @@ -2658,123 +2601,28 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>
* @param {Boolean} synthetic True if this event is implicit.
*/
public addReceipt(event: MatrixEvent, synthetic = false): void {
this.addReceiptsToStructure(event, synthetic);
// send events after we've regenerated the structure & cache, otherwise things that
// listened for the event would read stale data.
this.emit(RoomEvent.Receipt, event, this);
}

/**
* Add a receipt event to the room.
* @param {MatrixEvent} event The m.receipt event.
* @param {Boolean} synthetic True if this event is implicit.
*/
private addReceiptsToStructure(event: MatrixEvent, synthetic: boolean): void {
const content = event.getContent<IReceiptContent>();
Object.keys(content).forEach((eventId) => {
Object.keys(content[eventId]).forEach((receiptType) => {
Object.keys(content[eventId][receiptType]).forEach((userId) => {
const receipt = content[eventId][receiptType][userId];

if (!this.receipts[receiptType]) {
this.receipts[receiptType] = {};
}
if (!this.receipts[receiptType][userId]) {
this.receipts[receiptType][userId] = [null, null];
}

const pair = this.receipts[receiptType][userId];

let existingReceipt = pair[ReceiptPairRealIndex];
if (synthetic) {
existingReceipt = pair[ReceiptPairSyntheticIndex] ?? pair[ReceiptPairRealIndex];
}

if (existingReceipt) {
// we only want to add this receipt if we think it is later than the one we already have.
// This is managed server-side, but because we synthesize RRs locally we have to do it here too.
const ordering = this.getUnfilteredTimelineSet().compareEventOrdering(
existingReceipt.eventId,
eventId,
);
if (ordering !== null && ordering >= 0) {
return;
}
}

const wrappedReceipt: IWrappedReceipt = {
const content = event.getContent<ReceiptContent>();
Object.keys(content).forEach((eventId: string) => {
Object.keys(content[eventId]).forEach((receiptType: ReceiptType) => {
Object.keys(content[eventId][receiptType]).forEach((userId: string) => {
// hack, threadId should be thread_id
const receipt = content[eventId][receiptType][userId] as any;

const receiptDestination = this.threads.get(receipt.thread_id) ?? this;
receiptDestination.addReceiptToStructure(
eventId,
data: receipt,
};

const realReceipt = synthetic ? pair[ReceiptPairRealIndex] : wrappedReceipt;
const syntheticReceipt = synthetic ? wrappedReceipt : pair[ReceiptPairSyntheticIndex];

let ordering: number | null = null;
if (realReceipt && syntheticReceipt) {
ordering = this.getUnfilteredTimelineSet().compareEventOrdering(
realReceipt.eventId,
syntheticReceipt.eventId,
);
}

const preferSynthetic = ordering === null || ordering < 0;

// we don't bother caching just real receipts by event ID as there's nothing that would read it.
// Take the current cached receipt before we overwrite the pair elements.
const cachedReceipt = pair[ReceiptPairSyntheticIndex] ?? pair[ReceiptPairRealIndex];

if (synthetic && preferSynthetic) {
pair[ReceiptPairSyntheticIndex] = wrappedReceipt;
} else if (!synthetic) {
pair[ReceiptPairRealIndex] = wrappedReceipt;

if (!preferSynthetic) {
pair[ReceiptPairSyntheticIndex] = null;
}
}

const newCachedReceipt = pair[ReceiptPairSyntheticIndex] ?? pair[ReceiptPairRealIndex];
if (cachedReceipt === newCachedReceipt) return;

// clean up any previous cache entry
if (cachedReceipt && this.receiptCacheByEventId[cachedReceipt.eventId]) {
const previousEventId = cachedReceipt.eventId;
// Remove the receipt we're about to clobber out of existence from the cache
this.receiptCacheByEventId[previousEventId] = (
this.receiptCacheByEventId[previousEventId].filter(r => {
return r.type !== receiptType || r.userId !== userId;
})
);

if (this.receiptCacheByEventId[previousEventId].length < 1) {
delete this.receiptCacheByEventId[previousEventId]; // clean up the cache keys
}
}

// cache the new one
if (!this.receiptCacheByEventId[eventId]) {
this.receiptCacheByEventId[eventId] = [];
}
this.receiptCacheByEventId[eventId].push({
userId: userId,
type: receiptType as ReceiptType,
data: receipt,
});
receiptType,
userId,
receipt,
synthetic,
);
});
});
});
}

/**
* Add a temporary local-echo receipt to the room to reflect in the
* client the fact that we've sent one.
* @param {string} userId The user ID if the receipt sender
* @param {MatrixEvent} e The event that is to be acknowledged
* @param {ReceiptType} receiptType The type of receipt
*/
public addLocalEchoReceipt(userId: string, e: MatrixEvent, receiptType: ReceiptType): void {
this.addReceipt(synthesizeReceipt(userId, e, receiptType), true);
// send events after we've regenerated the structure & cache, otherwise things that
// listened for the event would read stale data.
this.emit(RoomEvent.Receipt, event, this);
}

/**
Expand Down
16 changes: 14 additions & 2 deletions src/models/thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import { IThreadBundledRelationship, MatrixEvent } from "./event";
import { Direction, EventTimeline } from "./event-timeline";
import { EventTimelineSet, EventTimelineSetHandlerMap } from './event-timeline-set';
import { Room } from './room';
import { TypedEventEmitter } from "./typed-event-emitter";
import { RoomState } from "./room-state";
import { ServerControlledNamespacedValue } from "../NamespacedValue";
import { logger } from "../logger";
import { TimelineReceipts } from "./timeline-receipts";

export enum ThreadEvent {
New = "Thread.new",
Expand Down Expand Up @@ -54,7 +54,7 @@ interface IThreadOpts {
/**
* @experimental
*/
export class Thread extends TypedEventEmitter<EmittedEvents, EventHandlerMap> {
export class Thread extends TimelineReceipts<EmittedEvents, EventHandlerMap> {
public static hasServerSideSupport: boolean;

/**
Expand Down Expand Up @@ -429,6 +429,18 @@ export class Thread extends TypedEventEmitter<EmittedEvents, EventHandlerMap> {
nextBatch,
};
}

public getUnfilteredTimelineSet(): EventTimelineSet {
return this.timelineSet;
}

public get timeline(): MatrixEvent[] {
return this.events;
}

public addReceipt(event: MatrixEvent, synthetic: boolean): void {
throw new Error("Unsupported function on the thread model");
}
}

export const FILTER_RELATED_BY_SENDERS = new ServerControlledNamespacedValue(
Expand Down
Loading