Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit b9958ed

Browse files
committed
Add order_by argument for paginating room events
This can be used to flip betweek topological ordering (the default) and stream ordering as needed by the caller.
1 parent 831d479 commit b9958ed

File tree

1 file changed

+36
-6
lines changed

1 file changed

+36
-6
lines changed

synapse/storage/databases/main/stream.py

+36-6
Original file line numberDiff line numberDiff line change
@@ -652,22 +652,31 @@ def f(txn: LoggingTransaction) -> List[_EventDictReturn]:
652652
return ret
653653

654654
async def get_recent_events_for_room(
655-
self, room_id: str, limit: int, end_token: RoomStreamToken
655+
self,
656+
room_id: str,
657+
limit: int,
658+
end_token: RoomStreamToken,
659+
order_by: str = "topological",
656660
) -> Tuple[List[EventBase], RoomStreamToken]:
657661
"""Get the most recent events in the room in topological ordering.
658662
659663
Args:
660664
room_id
661665
limit
662666
end_token: The stream token representing now.
667+
order_by: Either 'topological' or 'stream' to indicate the order in
668+
which results should be returned.
663669
664670
Returns:
665671
A list of events and a token pointing to the start of the returned
666672
events. The events returned are in ascending topological order.
667673
"""
668674

669675
rows, token = await self.get_recent_event_ids_for_room(
670-
room_id, limit, end_token
676+
room_id,
677+
limit,
678+
end_token,
679+
order_by,
671680
)
672681

673682
events = await self.get_events_as_list(
@@ -679,14 +688,20 @@ async def get_recent_events_for_room(
679688
return events, token
680689

681690
async def get_recent_event_ids_for_room(
682-
self, room_id: str, limit: int, end_token: RoomStreamToken
691+
self,
692+
room_id: str,
693+
limit: int,
694+
end_token: RoomStreamToken,
695+
order_by: str = "topological",
683696
) -> Tuple[List[_EventDictReturn], RoomStreamToken]:
684697
"""Get the most recent events in the room in topological ordering.
685698
686699
Args:
687700
room_id
688701
limit
689702
end_token: The stream token representing now.
703+
order_by: Either 'topological' or 'stream' to indicate the order in
704+
which results should be returned.
690705
691706
Returns:
692707
A list of _EventDictReturn and a token pointing to the start of the
@@ -701,6 +716,7 @@ async def get_recent_event_ids_for_room(
701716
self._paginate_room_events_txn,
702717
room_id,
703718
from_token=end_token,
719+
order_by=order_by,
704720
limit=limit,
705721
)
706722

@@ -1099,6 +1115,7 @@ def _paginate_room_events_txn(
10991115
from_token: RoomStreamToken,
11001116
to_token: Optional[RoomStreamToken] = None,
11011117
direction: str = "b",
1118+
order_by: str = "topological",
11021119
limit: int = -1,
11031120
event_filter: Optional[Filter] = None,
11041121
) -> Tuple[List[_EventDictReturn], RoomStreamToken]:
@@ -1111,6 +1128,8 @@ def _paginate_room_events_txn(
11111128
to_token: A token which if given limits the results to only those before
11121129
direction: Either 'b' or 'f' to indicate whether we are paginating
11131130
forwards or backwards from `from_key`.
1131+
order_by: Either 'topological' or 'stream' to indicate the order in
1132+
which results should be returned.
11141133
limit: The maximum number of events to return.
11151134
event_filter: If provided filters the events to
11161135
those that match the filter.
@@ -1123,6 +1142,7 @@ def _paginate_room_events_txn(
11231142
"""
11241143

11251144
assert int(limit) >= 0
1145+
assert order_by in ("topological", "stream")
11261146

11271147
# Tokens really represent positions between elements, but we use
11281148
# the convention of pointing to the event before the gap. Hence
@@ -1133,6 +1153,12 @@ def _paginate_room_events_txn(
11331153
else:
11341154
order = "ASC"
11351155

1156+
order_clause = """ORDER BY event.topological_ordering %(order)s, event.stream_ordering %(order)s"""
1157+
if order_by == "stream":
1158+
order_clause = """ORDER BY event.stream_ordering %(order)s, event.topological_ordering %(order)s"""
1159+
1160+
order_clause = order_clause % {"order": order}
1161+
11361162
# The bounds for the stream tokens are complicated by the fact
11371163
# that we need to handle the instance_map part of the tokens. We do this
11381164
# by fetching all events between the min stream token and the maximum
@@ -1228,13 +1254,13 @@ def _paginate_room_events_txn(
12281254
FROM events AS event
12291255
%(join_clause)s
12301256
WHERE event.outlier = ? AND event.room_id = ? AND %(bounds)s
1231-
ORDER BY event.topological_ordering %(order)s,
1232-
event.stream_ordering %(order)s LIMIT ?
1257+
%(order_clause)s
1258+
LIMIT ?
12331259
""" % {
12341260
"select_keywords": select_keywords,
12351261
"join_clause": join_clause,
12361262
"bounds": bounds,
1237-
"order": order,
1263+
"order_clause": order_clause,
12381264
}
12391265

12401266
txn.execute(sql, args)
@@ -1275,6 +1301,7 @@ async def paginate_room_events(
12751301
from_key: RoomStreamToken,
12761302
to_key: Optional[RoomStreamToken] = None,
12771303
direction: str = "b",
1304+
order_by: str = "topological",
12781305
limit: int = -1,
12791306
event_filter: Optional[Filter] = None,
12801307
) -> Tuple[List[EventBase], RoomStreamToken]:
@@ -1286,6 +1313,8 @@ async def paginate_room_events(
12861313
to_key: A token which if given limits the results to only those before
12871314
direction: Either 'b' or 'f' to indicate whether we are paginating
12881315
forwards or backwards from `from_key`.
1316+
order_by: Either 'topological' or 'stream' to indicate the order in
1317+
which results should be returned.
12891318
limit: The maximum number of events to return.
12901319
event_filter: If provided filters the events to those that match the filter.
12911320
@@ -1303,6 +1332,7 @@ async def paginate_room_events(
13031332
from_key,
13041333
to_key,
13051334
direction,
1335+
order_by,
13061336
limit,
13071337
event_filter,
13081338
)

0 commit comments

Comments
 (0)