Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 793d03e

Browse files
Generate historic pagination token for /messages when no ?from token provided (#12370)
1 parent 573cd0f commit 793d03e

File tree

6 files changed

+27
-15
lines changed

6 files changed

+27
-15
lines changed

changelog.d/12370.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix `/messages` returning backfilled and [MSC2716](https://github.com/matrix-org/synapse/pull/12319) historic messages our of order.

synapse/handlers/pagination.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,14 @@ async def get_messages(
441441
if pagin_config.from_token:
442442
from_token = pagin_config.from_token
443443
else:
444-
from_token = self.hs.get_event_sources().get_current_token_for_pagination()
444+
from_token = (
445+
await self.hs.get_event_sources().get_current_token_for_pagination(
446+
room_id
447+
)
448+
)
449+
# We expect `/messages` to use historic pagination tokens by default but
450+
# `/messages` should still works with live tokens when manually provided.
451+
assert from_token.room_key.topological
445452

446453
if pagin_config.limit is None:
447454
# This shouldn't happen as we've set a default limit before this

synapse/handlers/room.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1444,8 +1444,8 @@ async def get_new_events(
14441444
def get_current_key(self) -> RoomStreamToken:
14451445
return self.store.get_room_max_token()
14461446

1447-
def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
1448-
return self.store.get_room_events_max_id(room_id)
1447+
def get_current_key_for_room(self, room_id: str) -> Awaitable[RoomStreamToken]:
1448+
return self.store.get_current_room_stream_token_for_room_id(room_id)
14491449

14501450

14511451
class ShutdownRoomResponse(TypedDict):

synapse/storage/databases/main/stream.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -748,21 +748,23 @@ def _f(txn):
748748
"get_room_event_before_stream_ordering", _f
749749
)
750750

751-
async def get_room_events_max_id(self, room_id: Optional[str] = None) -> str:
752-
"""Returns the current token for rooms stream.
753-
754-
By default, it returns the current global stream token. Specifying a
755-
`room_id` causes it to return the current room specific topological
756-
token.
751+
async def get_current_room_stream_token_for_room_id(
752+
self, room_id: Optional[str] = None
753+
) -> RoomStreamToken:
754+
"""Returns the current position of the rooms stream.
755+
756+
By default, it returns a live token with the current global stream
757+
token. Specifying a `room_id` causes it to return a historic token with
758+
the room specific topological token.
757759
"""
758-
token = self.get_room_max_stream_ordering()
760+
stream_ordering = self.get_room_max_stream_ordering()
759761
if room_id is None:
760-
return "s%d" % (token,)
762+
return RoomStreamToken(None, stream_ordering)
761763
else:
762764
topo = await self.db_pool.runInteraction(
763765
"_get_max_topological_txn", self._get_max_topological_txn, room_id
764766
)
765-
return "t%d-%d" % (topo, token)
767+
return RoomStreamToken(topo, stream_ordering)
766768

767769
def get_stream_id_for_event_txn(
768770
self,

synapse/streams/events.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def get_current_token(self) -> StreamToken:
6969
)
7070
return token
7171

72-
def get_current_token_for_pagination(self) -> StreamToken:
72+
async def get_current_token_for_pagination(self, room_id: str) -> StreamToken:
7373
"""Get the current token for a given room to be used to paginate
7474
events.
7575
@@ -80,7 +80,7 @@ def get_current_token_for_pagination(self) -> StreamToken:
8080
The current token for pagination.
8181
"""
8282
token = StreamToken(
83-
room_key=self.sources.room.get_current_key(),
83+
room_key=await self.sources.room.get_current_key_for_room(room_id),
8484
presence_key=0,
8585
typing_key=0,
8686
receipt_key=0,

tests/storage/test_stream.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,9 @@ def prepare(self, reactor, clock, homeserver):
110110
def _filter_messages(self, filter: JsonDict) -> List[EventBase]:
111111
"""Make a request to /messages with a filter, returns the chunk of events."""
112112

113-
from_token = self.hs.get_event_sources().get_current_token_for_pagination()
113+
from_token = self.get_success(
114+
self.hs.get_event_sources().get_current_token_for_pagination(self.room_id)
115+
)
114116

115117
events, next_key = self.get_success(
116118
self.hs.get_datastores().main.paginate_room_events(

0 commit comments

Comments
 (0)