@@ -17,11 +17,13 @@ limitations under the License.
17
17
import { MatrixClient } from "../matrix" ;
18
18
import { ReEmitter } from "../ReEmitter" ;
19
19
import { RelationType } from "../@types/event" ;
20
+ import { IRelationsRequestOpts } from "../@types/requests" ;
20
21
import { MatrixEvent , IThreadBundledRelationship } from "./event" ;
21
- import { EventTimeline } from "./event-timeline" ;
22
+ import { Direction , EventTimeline } from "./event-timeline" ;
22
23
import { EventTimelineSet } from './event-timeline-set' ;
23
24
import { Room } from './room' ;
24
25
import { TypedEventEmitter } from "./typed-event-emitter" ;
26
+ import { RoomState } from "./room-state" ;
25
27
26
28
export enum ThreadEvent {
27
29
New = "Thread.new" ,
@@ -31,14 +33,16 @@ export enum ThreadEvent {
31
33
ViewThread = "Thred.viewThread" ,
32
34
}
33
35
36
+ interface IThreadOpts {
37
+ initialEvents ?: MatrixEvent [ ] ;
38
+ room : Room ;
39
+ client : MatrixClient ;
40
+ }
41
+
34
42
/**
35
43
* @experimental
36
44
*/
37
45
export class Thread extends TypedEventEmitter < ThreadEvent > {
38
- /**
39
- * A reference to the event ID at the top of the thread
40
- */
41
- private root : string ;
42
46
/**
43
47
* A reference to all the events ID at the bottom of the threads
44
48
*/
@@ -51,33 +55,37 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
51
55
private lastEvent : MatrixEvent ;
52
56
private replyCount = 0 ;
53
57
58
+ public readonly room : Room ;
59
+ public readonly client : MatrixClient ;
60
+
61
+ public initialEventsFetched = false ;
62
+
54
63
constructor (
55
- events : MatrixEvent [ ] = [ ] ,
56
- public readonly room : Room ,
57
- public readonly client : MatrixClient ,
64
+ public readonly rootEvent : MatrixEvent ,
65
+ opts : IThreadOpts ,
58
66
) {
59
67
super ( ) ;
60
- if ( events . length === 0 ) {
61
- throw new Error ( "Can't create an empty thread" ) ;
62
- }
63
-
64
- this . reEmitter = new ReEmitter ( this ) ;
65
68
69
+ this . room = opts . room ;
70
+ this . client = opts . client ;
66
71
this . timelineSet = new EventTimelineSet ( this . room , {
67
72
unstableClientRelationAggregation : true ,
68
73
timelineSupport : true ,
69
74
pendingEvents : true ,
70
75
} ) ;
76
+ this . reEmitter = new ReEmitter ( this ) ;
77
+
78
+ this . initialiseThread ( this . rootEvent ) ;
71
79
72
80
this . reEmitter . reEmit ( this . timelineSet , [
73
81
"Room.timeline" ,
74
82
"Room.timelineReset" ,
75
83
] ) ;
76
84
77
- events . forEach ( event => this . addEvent ( event ) ) ;
85
+ opts ?. initialEvents . forEach ( event => this . addEvent ( event ) ) ;
78
86
79
- room . on ( "Room.localEchoUpdated" , this . onEcho ) ;
80
- room . on ( "Room.timeline" , this . onEcho ) ;
87
+ this . room . on ( "Room.localEchoUpdated" , this . onEcho ) ;
88
+ this . room . on ( "Room.timeline" , this . onEcho ) ;
81
89
}
82
90
83
91
public get hasServerSideSupport ( ) : boolean {
@@ -91,85 +99,115 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
91
99
}
92
100
} ;
93
101
102
+ private get roomState ( ) : RoomState {
103
+ return this . room . getLiveTimeline ( ) . getState ( EventTimeline . FORWARDS ) ;
104
+ }
105
+
94
106
/**
95
107
* Add an event to the thread and updates
96
108
* the tail/root references if needed
97
109
* Will fire "Thread.update"
98
110
* @param event The event to add
99
111
*/
100
112
public async addEvent ( event : MatrixEvent , toStartOfTimeline = false ) : Promise < void > {
101
- if ( this . timelineSet . findEventById ( event . getId ( ) ) ) {
102
- return ;
113
+ // Add all incoming events to the thread's timeline set when there's
114
+ // no server support
115
+ if ( ! this . hasServerSideSupport ) {
116
+ if ( this . timelineSet . findEventById ( event . getId ( ) ) ) {
117
+ return ;
118
+ }
119
+
120
+ // all the relevant membership info to hydrate events with a sender
121
+ // is held in the main room timeline
122
+ // We want to fetch the room state from there and pass it down to this thread
123
+ // timeline set to let it reconcile an event with its relevant RoomMember
124
+
125
+ event . setThread ( this ) ;
126
+ this . timelineSet . addEventToTimeline (
127
+ event ,
128
+ this . liveTimeline ,
129
+ toStartOfTimeline ,
130
+ false ,
131
+ this . roomState ,
132
+ ) ;
133
+
134
+ await this . client . decryptEventIfNeeded ( event , { } ) ;
103
135
}
104
136
105
- if ( ! this . root ) {
106
- if ( event . isThreadRelation ) {
107
- this . root = event . threadRootId ;
108
- } else {
109
- this . root = event . getId ( ) ;
137
+ if ( this . hasServerSideSupport && this . initialEventsFetched ) {
138
+ if ( event . localTimestamp > this . lastReply ( ) . localTimestamp && ! this . findEventById ( event . getId ( ) ) ) {
139
+ this . timelineSet . addEventToTimeline (
140
+ event ,
141
+ this . liveTimeline ,
142
+ false ,
143
+ false ,
144
+ this . roomState ,
145
+ ) ;
110
146
}
111
147
}
112
148
113
- // all the relevant membership info to hydrate events with a sender
114
- // is held in the main room timeline
115
- // We want to fetch the room state from there and pass it down to this thread
116
- // timeline set to let it reconcile an event with its relevant RoomMember
117
- const roomState = this . room . getLiveTimeline ( ) . getState ( EventTimeline . FORWARDS ) ;
118
-
119
- event . setThread ( this ) ;
120
- this . timelineSet . addEventToTimeline (
121
- event ,
122
- this . timelineSet . getLiveTimeline ( ) ,
123
- toStartOfTimeline ,
124
- false ,
125
- roomState ,
126
- ) ;
127
-
128
149
if ( ! this . _currentUserParticipated && event . getSender ( ) === this . client . getUserId ( ) ) {
129
150
this . _currentUserParticipated = true ;
130
151
}
131
152
132
- await this . client . decryptEventIfNeeded ( event , { } ) ;
133
-
134
153
const isThreadReply = event . getRelation ( ) ?. rel_type === RelationType . Thread ;
135
154
// If no thread support exists we want to count all thread relation
136
155
// added as a reply. We can't rely on the bundled relationships count
137
156
if ( ! this . hasServerSideSupport && isThreadReply ) {
138
157
this . replyCount ++ ;
139
158
}
140
159
141
- if ( ! this . lastEvent || ( isThreadReply && event . getTs ( ) > this . lastEvent . getTs ( ) ) ) {
160
+ // There is a risk that the `localTimestamp` approximation will not be accurate
161
+ // when threads are used over federation. That could results in the reply
162
+ // count value drifting away from the value returned by the server
163
+ if ( ! this . lastEvent || ( isThreadReply && event . localTimestamp > this . replyToEvent . localTimestamp ) ) {
142
164
this . lastEvent = event ;
143
- if ( this . lastEvent . getId ( ) !== this . root ) {
165
+ if ( this . lastEvent . getId ( ) !== this . id ) {
144
166
// This counting only works when server side support is enabled
145
167
// as we started the counting from the value returned in the
146
168
// bundled relationship
147
169
if ( this . hasServerSideSupport ) {
148
170
this . replyCount ++ ;
149
171
}
172
+
150
173
this . emit ( ThreadEvent . NewReply , this , event ) ;
151
174
}
152
175
}
153
176
154
- if ( event . getId ( ) === this . root ) {
155
- const bundledRelationship = event
156
- . getServerAggregatedRelation < IThreadBundledRelationship > ( RelationType . Thread ) ;
177
+ this . emit ( ThreadEvent . Update , this ) ;
178
+ }
157
179
158
- if ( this . hasServerSideSupport && bundledRelationship ) {
159
- this . replyCount = bundledRelationship . count ;
160
- this . _currentUserParticipated = bundledRelationship . current_user_participated ;
180
+ private initialiseThread ( rootEvent : MatrixEvent ) : void {
181
+ const bundledRelationship = rootEvent
182
+ . getServerAggregatedRelation < IThreadBundledRelationship > ( RelationType . Thread ) ;
161
183
162
- const lastReply = this . findEventById ( bundledRelationship . latest_event . event_id ) ;
163
- if ( lastReply ) {
164
- this . lastEvent = lastReply ;
165
- } else {
166
- const event = new MatrixEvent ( bundledRelationship . latest_event ) ;
167
- this . lastEvent = event ;
168
- }
169
- }
184
+ if ( this . hasServerSideSupport && bundledRelationship ) {
185
+ this . replyCount = bundledRelationship . count ;
186
+ this . _currentUserParticipated = bundledRelationship . current_user_participated ;
187
+
188
+ const event = new MatrixEvent ( bundledRelationship . latest_event ) ;
189
+ this . setEventMetadata ( event ) ;
190
+ this . lastEvent = event ;
170
191
}
171
192
172
- this . emit ( ThreadEvent . Update , this ) ;
193
+ if ( ! bundledRelationship ) {
194
+ this . addEvent ( rootEvent ) ;
195
+ }
196
+ }
197
+
198
+ public async fetchInitialEvents ( ) : Promise < boolean > {
199
+ try {
200
+ await this . fetchEvents ( ) ;
201
+ this . initialEventsFetched = true ;
202
+ return true ;
203
+ } catch ( e ) {
204
+ return false ;
205
+ }
206
+ }
207
+
208
+ private setEventMetadata ( event : MatrixEvent ) : void {
209
+ EventTimeline . setEventMetadata ( event , this . roomState , false ) ;
210
+ event . setThread ( this ) ;
173
211
}
174
212
175
213
/**
@@ -185,7 +223,7 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
185
223
public lastReply ( matches : ( ev : MatrixEvent ) => boolean = ( ) => true ) : MatrixEvent {
186
224
for ( let i = this . events . length - 1 ; i >= 0 ; i -- ) {
187
225
const event = this . events [ i ] ;
188
- if ( event . isThreadRelation && matches ( event ) ) {
226
+ if ( matches ( event ) ) {
189
227
return event ;
190
228
}
191
229
}
@@ -195,14 +233,7 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
195
233
* The thread ID, which is the same as the root event ID
196
234
*/
197
235
public get id ( ) : string {
198
- return this . root ;
199
- }
200
-
201
- /**
202
- * The thread root event
203
- */
204
- public get rootEvent ( ) : MatrixEvent {
205
- return this . findEventById ( this . root ) ;
236
+ return this . rootEvent . getId ( ) ;
206
237
}
207
238
208
239
public get roomId ( ) : string {
@@ -226,14 +257,7 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
226
257
}
227
258
228
259
public get events ( ) : MatrixEvent [ ] {
229
- return this . timelineSet . getLiveTimeline ( ) . getEvents ( ) ;
230
- }
231
-
232
- public merge ( thread : Thread ) : void {
233
- thread . events . forEach ( event => {
234
- this . addEvent ( event ) ;
235
- } ) ;
236
- this . events . forEach ( event => event . setThread ( this ) ) ;
260
+ return this . liveTimeline . getEvents ( ) ;
237
261
}
238
262
239
263
public has ( eventId : string ) : boolean {
@@ -243,4 +267,55 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
243
267
public get hasCurrentUserParticipated ( ) : boolean {
244
268
return this . _currentUserParticipated ;
245
269
}
270
+
271
+ public get liveTimeline ( ) : EventTimeline {
272
+ return this . timelineSet . getLiveTimeline ( ) ;
273
+ }
274
+
275
+ public async fetchEvents ( opts : IRelationsRequestOpts = { limit : 20 } ) : Promise < {
276
+ originalEvent : MatrixEvent ;
277
+ events : MatrixEvent [ ] ;
278
+ nextBatch ?: string ;
279
+ prevBatch ?: string ;
280
+ } > {
281
+ let {
282
+ originalEvent,
283
+ events,
284
+ prevBatch,
285
+ nextBatch,
286
+ } = await this . client . relations (
287
+ this . room . roomId ,
288
+ this . id ,
289
+ RelationType . Thread ,
290
+ null ,
291
+ opts ,
292
+ ) ;
293
+
294
+ // When there's no nextBatch returned with a `from` request we have reached
295
+ // the end of the thread, and therefore want to return an empty one
296
+ if ( ! opts . to && ! nextBatch ) {
297
+ events = [ originalEvent , ...events ] ;
298
+ }
299
+
300
+ for ( const event of events ) {
301
+ await this . client . decryptEventIfNeeded ( event ) ;
302
+ this . setEventMetadata ( event ) ;
303
+ }
304
+
305
+ const prependEvents = ! opts . direction || opts . direction === Direction . Backward ;
306
+
307
+ this . timelineSet . addEventsToTimeline (
308
+ events ,
309
+ prependEvents ,
310
+ this . liveTimeline ,
311
+ prependEvents ? nextBatch : prevBatch ,
312
+ ) ;
313
+
314
+ return {
315
+ originalEvent,
316
+ events,
317
+ prevBatch,
318
+ nextBatch,
319
+ } ;
320
+ }
246
321
}
0 commit comments