Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit ea70f1c

Browse files
authored
Various clean ups to room stream tokens. (#8423)
1 parent 8238b55 commit ea70f1c

File tree

16 files changed

+96
-76
lines changed

16 files changed

+96
-76
lines changed

changelog.d/8423.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Various refactors to simplify stream token handling.

synapse/events/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from unpaddedbase64 import encode_base64
2424

2525
from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
26-
from synapse.types import JsonDict
26+
from synapse.types import JsonDict, RoomStreamToken
2727
from synapse.util.caches import intern_dict
2828
from synapse.util.frozenutils import freeze
2929

@@ -118,8 +118,8 @@ def __init__(self, internal_metadata_dict: JsonDict):
118118
# XXX: These are set by StreamWorkerStore._set_before_and_after.
119119
# I'm pretty sure that these are never persisted to the database, so shouldn't
120120
# be here
121-
before = DictProperty("before") # type: str
122-
after = DictProperty("after") # type: str
121+
before = DictProperty("before") # type: RoomStreamToken
122+
after = DictProperty("after") # type: RoomStreamToken
123123
order = DictProperty("order") # type: Tuple[int, int]
124124

125125
def get_dict(self) -> JsonDict:

synapse/handlers/admin.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ async def export_user_data(self, user_id, writer):
153153
if not events:
154154
break
155155

156-
from_key = RoomStreamToken.parse(events[-1].internal_metadata.after)
156+
from_key = events[-1].internal_metadata.after
157157

158158
events = await filter_events_for_client(self.storage, user_id, events)
159159

synapse/handlers/device.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
from synapse.logging.opentracing import log_kv, set_tag, trace
3030
from synapse.metrics.background_process_metrics import run_as_background_process
3131
from synapse.types import (
32-
RoomStreamToken,
3332
StreamToken,
3433
get_domain_from_id,
3534
get_verify_key_from_cross_signing_key,
@@ -113,8 +112,7 @@ async def get_user_ids_changed(self, user_id: str, from_token: StreamToken):
113112

114113
set_tag("user_id", user_id)
115114
set_tag("from_token", from_token)
116-
now_room_id = self.store.get_room_max_stream_ordering()
117-
now_room_key = RoomStreamToken(None, now_room_id)
115+
now_room_key = self.store.get_room_max_token()
118116

119117
room_ids = await self.store.get_rooms_for_user(user_id)
120118

synapse/handlers/initial_sync.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,8 @@ async def _room_initial_sync_parted(
325325
if limit is None:
326326
limit = 10
327327

328-
stream_token = await self.store.get_stream_token_for_event(member_event_id)
328+
leave_position = await self.store.get_position_for_event(member_event_id)
329+
stream_token = leave_position.to_room_stream_token()
329330

330331
messages, token = await self.store.get_recent_events_for_room(
331332
room_id, limit=limit, end_token=stream_token

synapse/handlers/pagination.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from synapse.metrics.background_process_metrics import run_as_background_process
2626
from synapse.storage.state import StateFilter
2727
from synapse.streams.config import PaginationConfig
28-
from synapse.types import Requester, RoomStreamToken
28+
from synapse.types import Requester
2929
from synapse.util.async_helpers import ReadWriteLock
3030
from synapse.util.stringutils import random_string
3131
from synapse.visibility import filter_events_for_client
@@ -373,10 +373,9 @@ async def get_messages(
373373
# case "JOIN" would have been returned.
374374
assert member_event_id
375375

376-
leave_token_str = await self.store.get_topological_token_for_event(
376+
leave_token = await self.store.get_topological_token_for_event(
377377
member_event_id
378378
)
379-
leave_token = RoomStreamToken.parse(leave_token_str)
380379
assert leave_token.topological is not None
381380

382381
if leave_token.topological < curr_topo:

synapse/handlers/room.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,14 +1134,14 @@ async def get_new_events(
11341134
events[:] = events[:limit]
11351135

11361136
if events:
1137-
end_key = RoomStreamToken.parse(events[-1].internal_metadata.after)
1137+
end_key = events[-1].internal_metadata.after
11381138
else:
11391139
end_key = to_key
11401140

11411141
return (events, end_key)
11421142

11431143
def get_current_key(self) -> RoomStreamToken:
1144-
return RoomStreamToken(None, self.store.get_room_max_stream_ordering())
1144+
return self.store.get_room_max_token()
11451145

11461146
def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
11471147
return self.store.get_room_events_max_id(room_id)

synapse/handlers/sync.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ async def _load_filtered_recents(
519519
if len(recents) > timeline_limit:
520520
limited = True
521521
recents = recents[-timeline_limit:]
522-
room_key = RoomStreamToken.parse(recents[0].internal_metadata.before)
522+
room_key = recents[0].internal_metadata.before
523523

524524
prev_batch_token = now_token.copy_and_replace("room_key", room_key)
525525

@@ -1595,16 +1595,24 @@ async def _get_rooms_changed(
15951595

15961596
if leave_events:
15971597
leave_event = leave_events[-1]
1598-
leave_stream_token = await self.store.get_stream_token_for_event(
1598+
leave_position = await self.store.get_position_for_event(
15991599
leave_event.event_id
16001600
)
1601-
leave_token = since_token.copy_and_replace(
1602-
"room_key", leave_stream_token
1603-
)
16041601

1605-
if since_token and since_token.is_after(leave_token):
1602+
# If the leave event happened before the since token then we
1603+
# bail.
1604+
if since_token and not leave_position.persisted_after(
1605+
since_token.room_key
1606+
):
16061607
continue
16071608

1609+
# We can safely convert the position of the leave event into a
1610+
# stream token as it'll only be used in the context of this
1611+
# room. (c.f. the docstring of `to_room_stream_token`).
1612+
leave_token = since_token.copy_and_replace(
1613+
"room_key", leave_position.to_room_stream_token()
1614+
)
1615+
16081616
# If this is an out of band message, like a remote invite
16091617
# rejection, we include it in the recents batch. Otherwise, we
16101618
# let _load_filtered_recents handle fetching the correct

synapse/notifier.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ def new_listener(self, token: StreamToken) -> _NotificationListener:
163163
"""
164164
# Immediately wake up stream if something has already since happened
165165
# since their last token.
166-
if self.last_notified_token.is_after(token):
166+
if self.last_notified_token != token:
167167
return _NotificationListener(defer.succeed(self.current_token))
168168
else:
169169
return _NotificationListener(self.notify_deferred.observe())
@@ -470,7 +470,7 @@ async def get_events_for(
470470
async def check_for_updates(
471471
before_token: StreamToken, after_token: StreamToken
472472
) -> EventStreamResult:
473-
if not after_token.is_after(before_token):
473+
if after_token == before_token:
474474
return EventStreamResult([], (from_token, from_token))
475475

476476
events = [] # type: List[EventBase]

synapse/replication/tcp/client.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
EventsStreamEventRow,
3030
EventsStreamRow,
3131
)
32-
from synapse.types import PersistedEventPosition, RoomStreamToken, UserID
32+
from synapse.types import PersistedEventPosition, UserID
3333
from synapse.util.async_helpers import timeout_deferred
3434
from synapse.util.metrics import Measure
3535

@@ -152,9 +152,7 @@ async def on_rdata(
152152
if event.type == EventTypes.Member:
153153
extra_users = (UserID.from_string(event.state_key),)
154154

155-
max_token = RoomStreamToken(
156-
None, self.store.get_room_max_stream_ordering()
157-
)
155+
max_token = self.store.get_room_max_token()
158156
event_pos = PersistedEventPosition(instance_name, token)
159157
self.notifier.on_new_room_event(
160158
event, event_pos, max_token, extra_users

synapse/rest/admin/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ async def on_POST(self, request, room_id, event_id):
109109
if event.room_id != room_id:
110110
raise SynapseError(400, "Event is for wrong room.")
111111

112-
token = await self.store.get_topological_token_for_event(event_id)
112+
room_token = await self.store.get_topological_token_for_event(event_id)
113+
token = str(room_token)
113114

114115
logger.info("[purge] purging up to token %s (event_id %s)", token, event_id)
115116
elif "purge_up_to_ts" in body:

synapse/storage/databases/main/stream.py

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
- topological tokems: "t%d-%d", where the integers map to the topological
3636
and stream ordering columns respectively.
3737
"""
38-
3938
import abc
4039
import logging
4140
from collections import namedtuple
@@ -54,7 +53,7 @@
5453
)
5554
from synapse.storage.databases.main.events_worker import EventsWorkerStore
5655
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
57-
from synapse.types import Collection, RoomStreamToken
56+
from synapse.types import Collection, PersistedEventPosition, RoomStreamToken
5857
from synapse.util.caches.stream_change_cache import StreamChangeCache
5958

6059
if TYPE_CHECKING:
@@ -305,6 +304,9 @@ def get_room_max_stream_ordering(self) -> int:
305304
def get_room_min_stream_ordering(self) -> int:
306305
raise NotImplementedError()
307306

307+
def get_room_max_token(self) -> RoomStreamToken:
308+
return RoomStreamToken(None, self.get_room_max_stream_ordering())
309+
308310
async def get_room_events_stream_for_rooms(
309311
self,
310312
room_ids: Collection[str],
@@ -611,34 +613,36 @@ def get_stream_id_for_event_txn(
611613
allow_none=allow_none,
612614
)
613615

614-
async def get_stream_token_for_event(self, event_id: str) -> RoomStreamToken:
615-
"""The stream token for an event
616-
Args:
617-
event_id: The id of the event to look up a stream token for.
618-
Raises:
619-
StoreError if the event wasn't in the database.
620-
Returns:
621-
A stream token.
616+
async def get_position_for_event(self, event_id: str) -> PersistedEventPosition:
617+
"""Get the persisted position for an event
622618
"""
623-
stream_id = await self.get_stream_id_for_event(event_id)
624-
return RoomStreamToken(None, stream_id)
619+
row = await self.db_pool.simple_select_one(
620+
table="events",
621+
keyvalues={"event_id": event_id},
622+
retcols=("stream_ordering", "instance_name"),
623+
desc="get_position_for_event",
624+
)
625+
626+
return PersistedEventPosition(
627+
row["instance_name"] or "master", row["stream_ordering"]
628+
)
625629

626-
async def get_topological_token_for_event(self, event_id: str) -> str:
630+
async def get_topological_token_for_event(self, event_id: str) -> RoomStreamToken:
627631
"""The stream token for an event
628632
Args:
629633
event_id: The id of the event to look up a stream token for.
630634
Raises:
631635
StoreError if the event wasn't in the database.
632636
Returns:
633-
A "t%d-%d" topological token.
637+
A `RoomStreamToken` topological token.
634638
"""
635639
row = await self.db_pool.simple_select_one(
636640
table="events",
637641
keyvalues={"event_id": event_id},
638642
retcols=("stream_ordering", "topological_ordering"),
639643
desc="get_topological_token_for_event",
640644
)
641-
return "t%d-%d" % (row["topological_ordering"], row["stream_ordering"])
645+
return RoomStreamToken(row["topological_ordering"], row["stream_ordering"])
642646

643647
async def get_current_topological_token(self, room_id: str, stream_key: int) -> int:
644648
"""Gets the topological token in a room after or at the given stream
@@ -687,8 +691,8 @@ def _set_before_and_after(
687691
else:
688692
topo = None
689693
internal = event.internal_metadata
690-
internal.before = str(RoomStreamToken(topo, stream - 1))
691-
internal.after = str(RoomStreamToken(topo, stream))
694+
internal.before = RoomStreamToken(topo, stream - 1)
695+
internal.after = RoomStreamToken(topo, stream)
692696
internal.order = (int(topo) if topo else 0, int(stream))
693697

694698
async def get_events_around(

synapse/storage/persist_events.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ async def persist_events(
229229
defer.gatherResults(deferreds, consumeErrors=True)
230230
)
231231

232-
return RoomStreamToken(None, self.main_store.get_current_events_token())
232+
return self.main_store.get_room_max_token()
233233

234234
async def persist_event(
235235
self, event: EventBase, context: EventContext, backfilled: bool = False
@@ -247,11 +247,10 @@ async def persist_event(
247247

248248
await make_deferred_yieldable(deferred)
249249

250-
max_persisted_id = self.main_store.get_current_events_token()
251250
event_stream_id = event.internal_metadata.stream_ordering
252251

253252
pos = PersistedEventPosition(self._instance_name, event_stream_id)
254-
return pos, RoomStreamToken(None, max_persisted_id)
253+
return pos, self.main_store.get_room_max_token()
255254

256255
def _maybe_start_persisting(self, room_id: str):
257256
async def persisting_queue(item):

synapse/types.py

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,18 @@ def parse_stream_token(cls, string: str) -> "RoomStreamToken":
413413
pass
414414
raise SynapseError(400, "Invalid token %r" % (string,))
415415

416+
def copy_and_advance(self, other: "RoomStreamToken") -> "RoomStreamToken":
417+
"""Return a new token such that if an event is after both this token and
418+
the other token, then its after the returned token too.
419+
"""
420+
421+
if self.topological or other.topological:
422+
raise Exception("Can't advance topological tokens")
423+
424+
max_stream = max(self.stream, other.stream)
425+
426+
return RoomStreamToken(None, max_stream)
427+
416428
def as_tuple(self) -> Tuple[Optional[int], int]:
417429
return (self.topological, self.stream)
418430

@@ -458,31 +470,20 @@ def to_string(self):
458470
def room_stream_id(self):
459471
return self.room_key.stream
460472

461-
def is_after(self, other):
462-
"""Does this token contain events that the other doesn't?"""
463-
return (
464-
(other.room_stream_id < self.room_stream_id)
465-
or (int(other.presence_key) < int(self.presence_key))
466-
or (int(other.typing_key) < int(self.typing_key))
467-
or (int(other.receipt_key) < int(self.receipt_key))
468-
or (int(other.account_data_key) < int(self.account_data_key))
469-
or (int(other.push_rules_key) < int(self.push_rules_key))
470-
or (int(other.to_device_key) < int(self.to_device_key))
471-
or (int(other.device_list_key) < int(self.device_list_key))
472-
or (int(other.groups_key) < int(self.groups_key))
473-
)
474-
475473
def copy_and_advance(self, key, new_value) -> "StreamToken":
476474
"""Advance the given key in the token to a new value if and only if the
477475
new value is after the old value.
478476
"""
479-
new_token = self.copy_and_replace(key, new_value)
480477
if key == "room_key":
481-
new_id = new_token.room_stream_id
482-
old_id = self.room_stream_id
483-
else:
484-
new_id = int(getattr(new_token, key))
485-
old_id = int(getattr(self, key))
478+
new_token = self.copy_and_replace(
479+
"room_key", self.room_key.copy_and_advance(new_value)
480+
)
481+
return new_token
482+
483+
new_token = self.copy_and_replace(key, new_value)
484+
new_id = int(getattr(new_token, key))
485+
old_id = int(getattr(self, key))
486+
486487
if old_id < new_id:
487488
return new_token
488489
else:
@@ -509,6 +510,18 @@ class PersistedEventPosition:
509510
def persisted_after(self, token: RoomStreamToken) -> bool:
510511
return token.stream < self.stream
511512

513+
def to_room_stream_token(self) -> RoomStreamToken:
514+
"""Converts the position to a room stream token such that events
515+
persisted in the same room after this position will be after the
516+
returned `RoomStreamToken`.
517+
518+
Note: no guarentees are made about ordering w.r.t. events in other
519+
rooms.
520+
"""
521+
# Doing the naive thing satisfies the desired properties described in
522+
# the docstring.
523+
return RoomStreamToken(None, self.stream)
524+
512525

513526
class ThirdPartyInstanceID(
514527
namedtuple("ThirdPartyInstanceID", ("appservice_id", "network_id"))

tests/rest/client/v1/test_rooms.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -902,15 +902,15 @@ def test_room_messages_purge(self):
902902

903903
# Send a first message in the room, which will be removed by the purge.
904904
first_event_id = self.helper.send(self.room_id, "message 1")["event_id"]
905-
first_token = self.get_success(
906-
store.get_topological_token_for_event(first_event_id)
905+
first_token = str(
906+
self.get_success(store.get_topological_token_for_event(first_event_id))
907907
)
908908

909909
# Send a second message in the room, which won't be removed, and which we'll
910910
# use as the marker to purge events before.
911911
second_event_id = self.helper.send(self.room_id, "message 2")["event_id"]
912-
second_token = self.get_success(
913-
store.get_topological_token_for_event(second_event_id)
912+
second_token = str(
913+
self.get_success(store.get_topological_token_for_event(second_event_id))
914914
)
915915

916916
# Send a third event in the room to ensure we don't fall under any edge case

0 commit comments

Comments
 (0)