12
12
# See the License for the specific language governing permissions and
13
13
# limitations under the License.
14
14
import logging
15
- from typing import TYPE_CHECKING , Dict , Iterable , Optional
15
+ from typing import (
16
+ TYPE_CHECKING ,
17
+ Collection ,
18
+ Dict ,
19
+ FrozenSet ,
20
+ Iterable ,
21
+ List ,
22
+ Optional ,
23
+ Tuple ,
24
+ )
16
25
17
26
import attr
18
27
from frozendict import frozendict
19
28
20
29
from synapse .api .constants import RelationTypes
21
30
from synapse .api .errors import SynapseError
22
31
from synapse .events import EventBase
23
- from synapse .types import JsonDict , Requester , StreamToken
32
+ from synapse .storage .databases .main .relations import _RelatedEvent
33
+ from synapse .types import JsonDict , Requester , StreamToken , UserID
24
34
from synapse .visibility import filter_events_for_client
25
35
26
36
if TYPE_CHECKING :
@@ -115,6 +125,9 @@ async def get_relations(
115
125
if event is None :
116
126
raise SynapseError (404 , "Unknown parent event." )
117
127
128
+ # Note that ignored users are not passed into get_relations_for_event
129
+ # below. Ignored users are handled in filter_events_for_client (and by
130
+ # not passing them in here we should get a better cache hit rate).
118
131
related_events , next_token = await self ._main_store .get_relations_for_event (
119
132
event_id = event_id ,
120
133
event = event ,
@@ -128,7 +141,9 @@ async def get_relations(
128
141
to_token = to_token ,
129
142
)
130
143
131
- events = await self ._main_store .get_events_as_list (related_events )
144
+ events = await self ._main_store .get_events_as_list (
145
+ [e .event_id for e in related_events ]
146
+ )
132
147
133
148
events = await filter_events_for_client (
134
149
self ._storage , user_id , events , is_peeking = (member_event_id is None )
@@ -162,16 +177,95 @@ async def get_relations(
162
177
163
178
return return_value
164
179
180
+ async def get_relations_for_event (
181
+ self ,
182
+ event_id : str ,
183
+ event : EventBase ,
184
+ room_id : str ,
185
+ relation_type : str ,
186
+ ignored_users : FrozenSet [str ] = frozenset (),
187
+ ) -> Tuple [List [_RelatedEvent ], Optional [StreamToken ]]:
188
+ """Get a list of events which relate to an event, ordered by topological ordering.
189
+
190
+ Args:
191
+ event_id: Fetch events that relate to this event ID.
192
+ event: The matching EventBase to event_id.
193
+ room_id: The room the event belongs to.
194
+ relation_type: The type of relation.
195
+ ignored_users: The users ignored by the requesting user.
196
+
197
+ Returns:
198
+ List of event IDs that match relations requested. The rows are of
199
+ the form `{"event_id": "..."}`.
200
+ """
201
+
202
+ # Call the underlying storage method, which is cached.
203
+ related_events , next_token = await self ._main_store .get_relations_for_event (
204
+ event_id , event , room_id , relation_type , direction = "f"
205
+ )
206
+
207
+ # Filter out ignored users and convert to the expected format.
208
+ related_events = [
209
+ event for event in related_events if event .sender not in ignored_users
210
+ ]
211
+
212
+ return related_events , next_token
213
+
214
+ async def get_annotations_for_event (
215
+ self ,
216
+ event_id : str ,
217
+ room_id : str ,
218
+ limit : int = 5 ,
219
+ ignored_users : FrozenSet [str ] = frozenset (),
220
+ ) -> List [JsonDict ]:
221
+ """Get a list of annotations on the event, grouped by event type and
222
+ aggregation key, sorted by count.
223
+
224
+ This is used e.g. to get the what and how many reactions have happend
225
+ on an event.
226
+
227
+ Args:
228
+ event_id: Fetch events that relate to this event ID.
229
+ room_id: The room the event belongs to.
230
+ limit: Only fetch the `limit` groups.
231
+ ignored_users: The users ignored by the requesting user.
232
+
233
+ Returns:
234
+ List of groups of annotations that match. Each row is a dict with
235
+ `type`, `key` and `count` fields.
236
+ """
237
+ # Get the base results for all users.
238
+ full_results = await self ._main_store .get_aggregation_groups_for_event (
239
+ event_id , room_id , limit
240
+ )
241
+
242
+ # Then subtract off the results for any ignored users.
243
+ ignored_results = await self ._main_store .get_aggregation_groups_for_users (
244
+ event_id , room_id , limit , ignored_users
245
+ )
246
+
247
+ filtered_results = []
248
+ for result in full_results :
249
+ key = (result ["type" ], result ["key" ])
250
+ if key in ignored_results :
251
+ result = result .copy ()
252
+ result ["count" ] -= ignored_results [key ]
253
+ if result ["count" ] <= 0 :
254
+ continue
255
+ filtered_results .append (result )
256
+
257
+ return filtered_results
258
+
165
259
async def _get_bundled_aggregation_for_event (
166
- self , event : EventBase , user_id : str
260
+ self , event : EventBase , ignored_users : FrozenSet [ str ]
167
261
) -> Optional [BundledAggregations ]:
168
262
"""Generate bundled aggregations for an event.
169
263
170
264
Note that this does not use a cache, but depends on cached methods.
171
265
172
266
Args:
173
267
event: The event to calculate bundled aggregations for.
174
- user_id : The user requesting the bundled aggregations .
268
+ ignored_users : The users ignored by the requesting user .
175
269
176
270
Returns:
177
271
The bundled aggregations for an event, if bundled aggregations are
@@ -194,18 +288,22 @@ async def _get_bundled_aggregation_for_event(
194
288
# while others need more processing during serialization.
195
289
aggregations = BundledAggregations ()
196
290
197
- annotations = await self ._main_store . get_aggregation_groups_for_event (
198
- event_id , room_id
291
+ annotations = await self .get_annotations_for_event (
292
+ event_id , room_id , ignored_users = ignored_users
199
293
)
200
294
if annotations :
201
295
aggregations .annotations = {"chunk" : annotations }
202
296
203
- references , next_token = await self ._main_store .get_relations_for_event (
204
- event_id , event , room_id , RelationTypes .REFERENCE , direction = "f"
297
+ references , next_token = await self .get_relations_for_event (
298
+ event_id ,
299
+ event ,
300
+ room_id ,
301
+ RelationTypes .REFERENCE ,
302
+ ignored_users = ignored_users ,
205
303
)
206
304
if references :
207
305
aggregations .references = {
208
- "chunk" : [{"event_id" : event_id } for event_id in references ]
306
+ "chunk" : [{"event_id" : event . event_id } for event in references ]
209
307
}
210
308
211
309
if next_token :
@@ -216,6 +314,99 @@ async def _get_bundled_aggregation_for_event(
216
314
# Store the bundled aggregations in the event metadata for later use.
217
315
return aggregations
218
316
317
+ async def get_threads_for_events (
318
+ self , event_ids : Collection [str ], user_id : str , ignored_users : FrozenSet [str ]
319
+ ) -> Dict [str , _ThreadAggregation ]:
320
+ """Get the bundled aggregations for threads for the requested events.
321
+
322
+ Args:
323
+ event_ids: Events to get aggregations for threads.
324
+ user_id: The user requesting the bundled aggregations.
325
+ ignored_users: The users ignored by the requesting user.
326
+
327
+ Returns:
328
+ A dictionary mapping event ID to the thread information.
329
+
330
+ May not contain a value for all requested event IDs.
331
+ """
332
+ user = UserID .from_string (user_id )
333
+
334
+ # Fetch thread summaries.
335
+ summaries = await self ._main_store .get_thread_summaries (event_ids )
336
+
337
+ # Only fetch participated for a limited selection based on what had
338
+ # summaries.
339
+ thread_event_ids = [
340
+ event_id for event_id , summary in summaries .items () if summary
341
+ ]
342
+ participated = await self ._main_store .get_threads_participated (
343
+ thread_event_ids , user_id
344
+ )
345
+
346
+ # Then subtract off the results for any ignored users.
347
+ ignored_results = await self ._main_store .get_threaded_messages_per_user (
348
+ thread_event_ids , ignored_users
349
+ )
350
+
351
+ # A map of event ID to the thread aggregation.
352
+ results = {}
353
+
354
+ for event_id , summary in summaries .items ():
355
+ if summary :
356
+ thread_count , latest_thread_event , edit = summary
357
+
358
+ # Subtract off the count of any ignored users.
359
+ for ignored_user in ignored_users :
360
+ thread_count -= ignored_results .get ((event_id , ignored_user ), 0 )
361
+
362
+ # This is gnarly, but if the latest event is from an ignored user,
363
+ # attempt to find one that isn't from an ignored user.
364
+ if latest_thread_event .sender in ignored_users :
365
+ room_id = latest_thread_event .room_id
366
+
367
+ # If the root event is not found, something went wrong, do
368
+ # not include a summary of the thread.
369
+ event = await self ._event_handler .get_event (user , room_id , event_id )
370
+ if event is None :
371
+ continue
372
+
373
+ potential_events , _ = await self .get_relations_for_event (
374
+ event_id ,
375
+ event ,
376
+ room_id ,
377
+ RelationTypes .THREAD ,
378
+ ignored_users ,
379
+ )
380
+
381
+ # If all found events are from ignored users, do not include
382
+ # a summary of the thread.
383
+ if not potential_events :
384
+ continue
385
+
386
+ # The *last* event returned is the one that is cared about.
387
+ event = await self ._event_handler .get_event (
388
+ user , room_id , potential_events [- 1 ].event_id
389
+ )
390
+ # It is unexpected that the event will not exist.
391
+ if event is None :
392
+ logger .warning (
393
+ "Unable to fetch latest event in a thread with event ID: %s" ,
394
+ potential_events [- 1 ].event_id ,
395
+ )
396
+ continue
397
+ latest_thread_event = event
398
+
399
+ results [event_id ] = _ThreadAggregation (
400
+ latest_event = latest_thread_event ,
401
+ latest_edit = edit ,
402
+ count = thread_count ,
403
+ # If there's a thread summary it must also exist in the
404
+ # participated dictionary.
405
+ current_user_participated = participated [event_id ],
406
+ )
407
+
408
+ return results
409
+
219
410
async def get_bundled_aggregations (
220
411
self , events : Iterable [EventBase ], user_id : str
221
412
) -> Dict [str , BundledAggregations ]:
@@ -239,13 +430,21 @@ async def get_bundled_aggregations(
239
430
# event ID -> bundled aggregation in non-serialized form.
240
431
results : Dict [str , BundledAggregations ] = {}
241
432
433
+ # Fetch any ignored users of the requesting user.
434
+ ignored_users = await self ._main_store .ignored_users (user_id )
435
+
242
436
# Fetch other relations per event.
243
437
for event in events_by_id .values ():
244
- event_result = await self ._get_bundled_aggregation_for_event (event , user_id )
438
+ event_result = await self ._get_bundled_aggregation_for_event (
439
+ event , ignored_users
440
+ )
245
441
if event_result :
246
442
results [event .event_id ] = event_result
247
443
248
444
# Fetch any edits (but not for redacted events).
445
+ #
446
+ # Note that there is no use in limiting edits by ignored users since the
447
+ # parent event should be ignored in the first place if the user is ignored.
249
448
edits = await self ._main_store .get_applicable_edits (
250
449
[
251
450
event_id
@@ -256,25 +455,10 @@ async def get_bundled_aggregations(
256
455
for event_id , edit in edits .items ():
257
456
results .setdefault (event_id , BundledAggregations ()).replace = edit
258
457
259
- # Fetch thread summaries.
260
- summaries = await self ._main_store .get_thread_summaries (events_by_id .keys ())
261
- # Only fetch participated for a limited selection based on what had
262
- # summaries.
263
- participated = await self ._main_store .get_threads_participated (
264
- [event_id for event_id , summary in summaries .items () if summary ], user_id
458
+ threads = await self .get_threads_for_events (
459
+ events_by_id .keys (), user_id , ignored_users
265
460
)
266
- for event_id , summary in summaries .items ():
267
- if summary :
268
- thread_count , latest_thread_event , edit = summary
269
- results .setdefault (
270
- event_id , BundledAggregations ()
271
- ).thread = _ThreadAggregation (
272
- latest_event = latest_thread_event ,
273
- latest_edit = edit ,
274
- count = thread_count ,
275
- # If there's a thread summary it must also exist in the
276
- # participated dictionary.
277
- current_user_participated = participated [event_id ],
278
- )
461
+ for event_id , thread in threads .items ():
462
+ results .setdefault (event_id , BundledAggregations ()).thread = thread
279
463
280
464
return results
0 commit comments