22
22
logger = logging .getLogger (__name__ )
23
23
24
24
25
- # Everything except `Membership.LEAVE`
25
+ # Everything except `Membership.LEAVE` because we want everything that's *still*
26
+ # relevant to the user.
26
27
MEMBERSHIP_TO_DISPLAY_IN_SYNC = (
27
28
Membership .INVITE ,
28
29
Membership .JOIN ,
@@ -305,13 +306,19 @@ async def get_sync_room_ids_for_user(
305
306
from_token : Optional [StreamToken ] = None ,
306
307
) -> AbstractSet [str ]:
307
308
"""
308
- Fetch room IDs that should be listed for this user in the sync response.
309
+ Fetch room IDs that should be listed for this user in the sync response (the
310
+ full room list that will be sliced, filtered, sorted).
309
311
310
312
We're looking for rooms that the user has not left (`invite`, `knock`, `join`,
311
313
and `ban`) or newly_left rooms that are > `from_token` and <= `to_token`.
312
314
"""
313
315
user_id = user .to_string ()
314
316
317
+ # For a sync without from_token, all rooms except leave
318
+
319
+ # For incremental syncs with a from_token, we only need rooms that have changes
320
+ # (some event occured).
321
+
315
322
# First grab a current snapshot rooms for the user
316
323
room_for_user_list = await self .store .get_rooms_for_local_user_where_membership_is (
317
324
user_id = user_id ,
@@ -333,11 +340,16 @@ async def get_sync_room_ids_for_user(
333
340
sync_room_id_set = {
334
341
room_for_user .room_id
335
342
for room_for_user in room_for_user_list
343
+ # TODO: Include kicks (leave where sender is not the user itself)
336
344
if room_for_user .membership in MEMBERSHIP_TO_DISPLAY_IN_SYNC
337
345
}
338
346
339
347
# Find the stream_ordering of the latest room membership event which will mark
340
348
# the spot we queried up to.
349
+ #
350
+ # TODO: With the new `GetRoomsForUserWithStreamOrdering` info, make a instance
351
+ # map to stream ordering and construct the new room key from that map,
352
+ # `RoomStreamToken(stream=<min in that map>, instance_map=...)`
341
353
max_stream_ordering_from_room_list = max (
342
354
room_for_user .stream_ordering for room_for_user in room_for_user_list
343
355
)
@@ -348,11 +360,31 @@ async def get_sync_room_ids_for_user(
348
360
if max_stream_ordering_from_room_list <= to_token .room_key .stream :
349
361
return sync_room_id_set
350
362
363
+ # ~~Of the membership events we pulled out, there still might be events that fail
364
+ # that conditional~~
365
+ #
366
+ # ~~We can get past the conditional above even though we might have fetched events~~
367
+ #
368
+ # Each event has an stream ID and instance. We can ask
369
+ #
370
+ # Multiple event_persisters
371
+ #
372
+ # For every event (GetRoomsForUserWithStreamOrdering) compare with
373
+ # `persisted_after` or add a new function to MultiWriterStreamToken to do the
374
+ # same thing.
375
+
376
+ # When you compare tokens, it could be any of these scenarios
377
+ # - Token A <= Token B (every stream pos is lower than the other token)
378
+ # - Token A >= Token B
379
+ # - It's indeterminate (intertwined, v_1_2, v2_1, both before/after each other)
380
+
351
381
# We assume the `from_token` is before or at-least equal to the `to_token`
352
382
assert (
353
383
from_token is None or from_token .room_key .stream <= to_token .room_key .stream
354
384
), f"{ from_token .room_key .stream if from_token else None } <= { to_token .room_key .stream } "
355
385
386
+ # We need to `wait_for_stream_token`, when they provide a token
387
+
356
388
# We assume the `from_token`/`to_token` is before the `max_stream_ordering_from_room_list`
357
389
assert (
358
390
from_token is None
@@ -369,10 +401,17 @@ async def get_sync_room_ids_for_user(
369
401
# - 1) Add back newly_left rooms (> `from_token` and <= `to_token`)
370
402
# - 2a) Remove rooms that the user joined after the `to_token`
371
403
# - 2b) Add back rooms that the user left after the `to_token`
404
+ #
405
+ # TODO: Split this into two separate lookups (from_token.room_key ->
406
+ # to_token.room_key) and (to_token.room_key -> RoomStreamToken(...)) to avoid
407
+ # needing to do raw stream comparison below since we don't have access to the
408
+ # `instance_name` that persisted that event. We could refactor
409
+ # `event.internal_metadata` to include it but it might turn out a little
410
+ # difficult and a bigger, broader Synapse change than we want to make.
372
411
membership_change_events = await self .store .get_membership_changes_for_user (
373
412
user_id ,
374
- # Start from the `from_token` if given, otherwise from the `to_token` so we
375
- # can still do the 2) fixups.
413
+ # Start from the `from_token` if given for the 1) fixups , otherwise from the
414
+ # `to_token` so we can still do the 2) fixups.
376
415
from_key = from_token .room_key if from_token else to_token .room_key ,
377
416
# Fetch up to our membership snapshot
378
417
to_key = RoomStreamToken (stream = max_stream_ordering_from_room_list ),
@@ -390,6 +429,7 @@ async def get_sync_room_ids_for_user(
390
429
for event in membership_change_events :
391
430
assert event .internal_metadata .stream_ordering
392
431
432
+ # TODO: Compare with instance_name/stream_ordering
393
433
if (
394
434
(
395
435
from_token is None
@@ -457,6 +497,17 @@ async def get_sync_room_ids_for_user(
457
497
+ "This is probably a mistake in assembling the `last_membership_change_by_room_id_after_to_token`"
458
498
+ "/`first_membership_change_by_room_id_after_to_token` dicts above."
459
499
)
500
+ # TODO: Instead of reading from `unsigned`, refactor this to use the
501
+ # `current_state_delta_stream` table in the future. Probably a new
502
+ # `get_membership_changes_for_user()` function that uses
503
+ # `current_state_delta_stream` with a join to `room_memberships`. This would
504
+ # help in state reset scenarios since `prev_content` is looking at the
505
+ # current branch vs the current room state. This is all just data given to
506
+ # the client so no real harm to data integrity, but we'd like to be nice to
507
+ # the client. Since the `current_state_delta_stream` table is new, it
508
+ # doesn't have all events in it. Since this is Sliding Sync, if we ever need
509
+ # to, we can signal the client to throw all of their state away by sending
510
+ # "operation: RESET".
460
511
prev_content = first_membership_change_after_to_token .unsigned .get (
461
512
"prev_content" , {}
462
513
)
0 commit comments