Skip to content

Commit be4a16f

Browse files
Sliding Sync: Track whether we have sent rooms down to clients (#17447)
The basic idea is that we introduce a new token for a sliding sync connection, which stores the mapping of room to room "status" (i.e. have we sent the room down?). This token allows us to handle duplicate requests properly. In future it can be used to store more "per-connection" information safely. In future this should be migrated into the DB, so its important that we try to reduce the number of syncs where we need to update the per-connection information. In this PoC this only happens when we: a) send down a set of room for the first time, or b) we have previously sent down a room and there are updates but we are not sending the room down the sync (due to not falling in a list range) Co-authored-by: Eric Eastwood <[email protected]>
1 parent 568051c commit be4a16f

File tree

9 files changed

+814
-45
lines changed

9 files changed

+814
-45
lines changed

changelog.d/17447.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Track which rooms have been sent to clients in the experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,8 @@ netaddr = ">=0.7.18"
201201
# add a lower bound to the Jinja2 dependency.
202202
Jinja2 = ">=3.0"
203203
bleach = ">=1.4.3"
204-
# We use `Self`, which were added in `typing-extensions` 4.0.
205-
typing-extensions = ">=4.0"
204+
# We use `assert_never`, which were added in `typing-extensions` 4.1.
205+
typing-extensions = ">=4.1"
206206
# We enforce that we have a `cryptography` version that bundles an `openssl`
207207
# with the latest security patches.
208208
cryptography = ">=3.4.7"

synapse/handlers/sliding_sync.py

Lines changed: 314 additions & 36 deletions
Large diffs are not rendered by default.

synapse/rest/client/sync.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -881,7 +881,6 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
881881
)
882882

883883
user = requester.user
884-
device_id = requester.device_id
885884

886885
timeout = parse_integer(request, "timeout", default=0)
887886
# Position in the stream
@@ -902,11 +901,12 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
902901

903902
sync_config = SlidingSyncConfig(
904903
user=user,
905-
device_id=device_id,
904+
requester=requester,
906905
# FIXME: Currently, we're just manually copying the fields from the
907-
# `SlidingSyncBody` into the config. How can we gurantee into the future
906+
# `SlidingSyncBody` into the config. How can we guarantee into the future
908907
# that we don't forget any? I would like something more structured like
909908
# `copy_attributes(from=body, to=config)`
909+
conn_id=body.conn_id,
910910
lists=body.lists,
911911
room_subscriptions=body.room_subscriptions,
912912
extensions=body.extensions,

synapse/server.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,7 @@ def get_jwt_handler(self) -> "JwtHandler":
559559
def get_sync_handler(self) -> SyncHandler:
560560
return SyncHandler(self)
561561

562+
@cache_in_self
562563
def get_sliding_sync_handler(self) -> SlidingSyncHandler:
563564
return SlidingSyncHandler(self)
564565

synapse/storage/databases/main/state_deltas.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626

2727
from synapse.storage._base import SQLBaseStore
2828
from synapse.storage.database import LoggingTransaction
29+
from synapse.storage.databases.main.stream import _filter_results_by_stream
30+
from synapse.types import RoomStreamToken
2931
from synapse.util.caches.stream_change_cache import StreamChangeCache
3032

3133
logger = logging.getLogger(__name__)
@@ -156,3 +158,38 @@ async def get_max_stream_id_in_current_state_deltas(self) -> int:
156158
"get_max_stream_id_in_current_state_deltas",
157159
self._get_max_stream_id_in_current_state_deltas_txn,
158160
)
161+
162+
async def get_current_state_deltas_for_room(
163+
self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
164+
) -> List[StateDelta]:
165+
"""Get the state deltas between two tokens."""
166+
167+
def get_current_state_deltas_for_room_txn(
168+
txn: LoggingTransaction,
169+
) -> List[StateDelta]:
170+
sql = """
171+
SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
172+
FROM current_state_delta_stream
173+
WHERE room_id = ? AND ? < stream_id AND stream_id <= ?
174+
ORDER BY stream_id ASC
175+
"""
176+
txn.execute(
177+
sql, (room_id, from_token.stream, to_token.get_max_stream_pos())
178+
)
179+
180+
return [
181+
StateDelta(
182+
stream_id=row[1],
183+
room_id=room_id,
184+
event_type=row[2],
185+
state_key=row[3],
186+
event_id=row[4],
187+
prev_event_id=row[5],
188+
)
189+
for row in txn
190+
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
191+
]
192+
193+
return await self.db_pool.runInteraction(
194+
"get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
195+
)

synapse/types/handlers/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
DeviceListUpdates,
3636
JsonDict,
3737
JsonMapping,
38+
Requester,
3839
SlidingSyncStreamToken,
3940
StreamToken,
4041
UserID,
@@ -109,7 +110,7 @@ class SlidingSyncConfig(SlidingSyncBody):
109110
"""
110111

111112
user: UserID
112-
device_id: Optional[str]
113+
requester: Requester
113114

114115
# Pydantic config
115116
class Config:

synapse/types/rest/client/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ class SlidingSyncBody(RequestBodyModel):
120120
Sliding Sync API request body.
121121
122122
Attributes:
123+
conn_id: An optional string to identify this connection to the server.
124+
Only one sliding sync connection is allowed per given conn_id (empty
125+
or not).
123126
lists: Sliding window API. A map of list key to list information
124127
(:class:`SlidingSyncList`). Max lists: 100. The list keys should be
125128
arbitrary strings which the client is using to refer to the list. Keep this
@@ -343,6 +346,8 @@ class AccountDataExtension(RequestBodyModel):
343346
e2ee: Optional[E2eeExtension] = None
344347
account_data: Optional[AccountDataExtension] = None
345348

349+
conn_id: Optional[str]
350+
346351
# mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
347352
if TYPE_CHECKING:
348353
lists: Optional[Dict[str, SlidingSyncList]] = None

0 commit comments

Comments
 (0)