Skip to content

Commit cb37198

Browse files
MadLittleModsMic92
authored andcommitted
Add Sliding Sync /sync/e2ee endpoint for To-Device messages (element-hq#17167)
This is being introduced as part of Sliding Sync but doesn't have any sliding window component. It's just a way to get E2EE events without having to sit through a big initial sync (`/sync` v2). And we can avoid encryption events being backed up by the main sync response or vice-versa. Part of some Sliding Sync simplification/experimentation. See [this discussion](element-hq#17167 (comment)) for why it may not be as useful as we thought. Based on: - matrix-org/matrix-spec-proposals#3575 - matrix-org/matrix-spec-proposals#3885 - matrix-org/matrix-spec-proposals#3884
1 parent 48de9d6 commit cb37198

File tree

7 files changed

+861
-175
lines changed

7 files changed

+861
-175
lines changed

Diff for: changelog.d/17167.feature

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync/e2ee` endpoint for To-Device messages and device encryption info.

Diff for: synapse/config/experimental.py

+3
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,9 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
332332
# MSC3391: Removing account data.
333333
self.msc3391_enabled = experimental.get("msc3391_enabled", False)
334334

335+
# MSC3575 (Sliding Sync API endpoints)
336+
self.msc3575_enabled: bool = experimental.get("msc3575_enabled", False)
337+
335338
# MSC3773: Thread notifications
336339
self.msc3773_enabled: bool = experimental.get("msc3773_enabled", False)
337340

Diff for: synapse/handlers/sync.py

+237-10
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,14 @@
2828
Dict,
2929
FrozenSet,
3030
List,
31+
Literal,
3132
Mapping,
3233
Optional,
3334
Sequence,
3435
Set,
3536
Tuple,
37+
Union,
38+
overload,
3639
)
3740

3841
import attr
@@ -128,6 +131,8 @@ class SyncVersion(Enum):
128131

129132
# Traditional `/sync` endpoint
130133
SYNC_V2 = "sync_v2"
134+
# Part of MSC3575 Sliding Sync
135+
E2EE_SYNC = "e2ee_sync"
131136

132137

133138
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -280,6 +285,26 @@ def __bool__(self) -> bool:
280285
)
281286

282287

288+
@attr.s(slots=True, frozen=True, auto_attribs=True)
289+
class E2eeSyncResult:
290+
"""
291+
Attributes:
292+
next_batch: Token for the next sync
293+
to_device: List of direct messages for the device.
294+
device_lists: List of user_ids whose devices have changed
295+
device_one_time_keys_count: Dict of algorithm to count for one time keys
296+
for this device
297+
device_unused_fallback_key_types: List of key types that have an unused fallback
298+
key
299+
"""
300+
301+
next_batch: StreamToken
302+
to_device: List[JsonDict]
303+
device_lists: DeviceListUpdates
304+
device_one_time_keys_count: JsonMapping
305+
device_unused_fallback_key_types: List[str]
306+
307+
283308
class SyncHandler:
284309
def __init__(self, hs: "HomeServer"):
285310
self.hs_config = hs.config
@@ -322,6 +347,31 @@ def __init__(self, hs: "HomeServer"):
322347

323348
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
324349

350+
@overload
351+
async def wait_for_sync_for_user(
352+
self,
353+
requester: Requester,
354+
sync_config: SyncConfig,
355+
sync_version: Literal[SyncVersion.SYNC_V2],
356+
request_key: SyncRequestKey,
357+
since_token: Optional[StreamToken] = None,
358+
timeout: int = 0,
359+
full_state: bool = False,
360+
) -> SyncResult: ...
361+
362+
@overload
363+
async def wait_for_sync_for_user(
364+
self,
365+
requester: Requester,
366+
sync_config: SyncConfig,
367+
sync_version: Literal[SyncVersion.E2EE_SYNC],
368+
request_key: SyncRequestKey,
369+
since_token: Optional[StreamToken] = None,
370+
timeout: int = 0,
371+
full_state: bool = False,
372+
) -> E2eeSyncResult: ...
373+
374+
@overload
325375
async def wait_for_sync_for_user(
326376
self,
327377
requester: Requester,
@@ -331,7 +381,18 @@ async def wait_for_sync_for_user(
331381
since_token: Optional[StreamToken] = None,
332382
timeout: int = 0,
333383
full_state: bool = False,
334-
) -> SyncResult:
384+
) -> Union[SyncResult, E2eeSyncResult]: ...
385+
386+
async def wait_for_sync_for_user(
387+
self,
388+
requester: Requester,
389+
sync_config: SyncConfig,
390+
sync_version: SyncVersion,
391+
request_key: SyncRequestKey,
392+
since_token: Optional[StreamToken] = None,
393+
timeout: int = 0,
394+
full_state: bool = False,
395+
) -> Union[SyncResult, E2eeSyncResult]:
335396
"""Get the sync for a client if we have new data for it now. Otherwise
336397
wait for new data to arrive on the server. If the timeout expires, then
337398
return an empty sync result.
@@ -344,8 +405,10 @@ async def wait_for_sync_for_user(
344405
since_token: The point in the stream to sync from.
345406
timeout: How long to wait for new data to arrive before giving up.
346407
full_state: Whether to return the full state for each room.
408+
347409
Returns:
348410
When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
411+
When `SyncVersion.E2EE_SYNC`, returns a `E2eeSyncResult`.
349412
"""
350413
# If the user is not part of the mau group, then check that limits have
351414
# not been exceeded (if not part of the group by this point, almost certain
@@ -366,6 +429,29 @@ async def wait_for_sync_for_user(
366429
logger.debug("Returning sync response for %s", user_id)
367430
return res
368431

432+
@overload
433+
async def _wait_for_sync_for_user(
434+
self,
435+
sync_config: SyncConfig,
436+
sync_version: Literal[SyncVersion.SYNC_V2],
437+
since_token: Optional[StreamToken],
438+
timeout: int,
439+
full_state: bool,
440+
cache_context: ResponseCacheContext[SyncRequestKey],
441+
) -> SyncResult: ...
442+
443+
@overload
444+
async def _wait_for_sync_for_user(
445+
self,
446+
sync_config: SyncConfig,
447+
sync_version: Literal[SyncVersion.E2EE_SYNC],
448+
since_token: Optional[StreamToken],
449+
timeout: int,
450+
full_state: bool,
451+
cache_context: ResponseCacheContext[SyncRequestKey],
452+
) -> E2eeSyncResult: ...
453+
454+
@overload
369455
async def _wait_for_sync_for_user(
370456
self,
371457
sync_config: SyncConfig,
@@ -374,7 +460,17 @@ async def _wait_for_sync_for_user(
374460
timeout: int,
375461
full_state: bool,
376462
cache_context: ResponseCacheContext[SyncRequestKey],
377-
) -> SyncResult:
463+
) -> Union[SyncResult, E2eeSyncResult]: ...
464+
465+
async def _wait_for_sync_for_user(
466+
self,
467+
sync_config: SyncConfig,
468+
sync_version: SyncVersion,
469+
since_token: Optional[StreamToken],
470+
timeout: int,
471+
full_state: bool,
472+
cache_context: ResponseCacheContext[SyncRequestKey],
473+
) -> Union[SyncResult, E2eeSyncResult]:
378474
"""The start of the machinery that produces a /sync response.
379475
380476
See https://spec.matrix.org/v1.1/client-server-api/#syncing for full details.
@@ -417,14 +513,16 @@ async def _wait_for_sync_for_user(
417513
if timeout == 0 or since_token is None or full_state:
418514
# we are going to return immediately, so don't bother calling
419515
# notifier.wait_for_events.
420-
result: SyncResult = await self.current_sync_for_user(
421-
sync_config, sync_version, since_token, full_state=full_state
516+
result: Union[SyncResult, E2eeSyncResult] = (
517+
await self.current_sync_for_user(
518+
sync_config, sync_version, since_token, full_state=full_state
519+
)
422520
)
423521
else:
424522
# Otherwise, we wait for something to happen and report it to the user.
425523
async def current_sync_callback(
426524
before_token: StreamToken, after_token: StreamToken
427-
) -> SyncResult:
525+
) -> Union[SyncResult, E2eeSyncResult]:
428526
return await self.current_sync_for_user(
429527
sync_config, sync_version, since_token
430528
)
@@ -456,14 +554,43 @@ async def current_sync_callback(
456554

457555
return result
458556

557+
@overload
558+
async def current_sync_for_user(
559+
self,
560+
sync_config: SyncConfig,
561+
sync_version: Literal[SyncVersion.SYNC_V2],
562+
since_token: Optional[StreamToken] = None,
563+
full_state: bool = False,
564+
) -> SyncResult: ...
565+
566+
@overload
567+
async def current_sync_for_user(
568+
self,
569+
sync_config: SyncConfig,
570+
sync_version: Literal[SyncVersion.E2EE_SYNC],
571+
since_token: Optional[StreamToken] = None,
572+
full_state: bool = False,
573+
) -> E2eeSyncResult: ...
574+
575+
@overload
459576
async def current_sync_for_user(
460577
self,
461578
sync_config: SyncConfig,
462579
sync_version: SyncVersion,
463580
since_token: Optional[StreamToken] = None,
464581
full_state: bool = False,
465-
) -> SyncResult:
466-
"""Generates the response body of a sync result, represented as a SyncResult.
582+
) -> Union[SyncResult, E2eeSyncResult]: ...
583+
584+
async def current_sync_for_user(
585+
self,
586+
sync_config: SyncConfig,
587+
sync_version: SyncVersion,
588+
since_token: Optional[StreamToken] = None,
589+
full_state: bool = False,
590+
) -> Union[SyncResult, E2eeSyncResult]:
591+
"""
592+
Generates the response body of a sync result, represented as a
593+
`SyncResult`/`E2eeSyncResult`.
467594
468595
This is a wrapper around `generate_sync_result` which starts an open tracing
469596
span to track the sync. See `generate_sync_result` for the next part of your
@@ -474,15 +601,25 @@ async def current_sync_for_user(
474601
sync_version: Determines what kind of sync response to generate.
475602
since_token: The point in the stream to sync from.p.
476603
full_state: Whether to return the full state for each room.
604+
477605
Returns:
478606
When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
607+
When `SyncVersion.E2EE_SYNC`, returns a `E2eeSyncResult`.
479608
"""
480609
with start_active_span("sync.current_sync_for_user"):
481610
log_kv({"since_token": since_token})
611+
482612
# Go through the `/sync` v2 path
483613
if sync_version == SyncVersion.SYNC_V2:
484-
sync_result: SyncResult = await self.generate_sync_result(
485-
sync_config, since_token, full_state
614+
sync_result: Union[SyncResult, E2eeSyncResult] = (
615+
await self.generate_sync_result(
616+
sync_config, since_token, full_state
617+
)
618+
)
619+
# Go through the MSC3575 Sliding Sync `/sync/e2ee` path
620+
elif sync_version == SyncVersion.E2EE_SYNC:
621+
sync_result = await self.generate_e2ee_sync_result(
622+
sync_config, since_token
486623
)
487624
else:
488625
raise Exception(
@@ -1691,6 +1828,96 @@ async def generate_sync_result(
16911828
next_batch=sync_result_builder.now_token,
16921829
)
16931830

1831+
async def generate_e2ee_sync_result(
1832+
self,
1833+
sync_config: SyncConfig,
1834+
since_token: Optional[StreamToken] = None,
1835+
) -> E2eeSyncResult:
1836+
"""
1837+
Generates the response body of a MSC3575 Sliding Sync `/sync/e2ee` result.
1838+
1839+
This is represented by a `E2eeSyncResult` struct, which is built from small
1840+
pieces using a `SyncResultBuilder`. The `sync_result_builder` is passed as a
1841+
mutable ("inout") parameter to various helper functions. These retrieve and
1842+
process the data which forms the sync body, often writing to the
1843+
`sync_result_builder` to store their output.
1844+
1845+
At the end, we transfer data from the `sync_result_builder` to a new `E2eeSyncResult`
1846+
instance to signify that the sync calculation is complete.
1847+
"""
1848+
user_id = sync_config.user.to_string()
1849+
app_service = self.store.get_app_service_by_user_id(user_id)
1850+
if app_service:
1851+
# We no longer support AS users using /sync directly.
1852+
# See https://github.com/matrix-org/matrix-doc/issues/1144
1853+
raise NotImplementedError()
1854+
1855+
sync_result_builder = await self.get_sync_result_builder(
1856+
sync_config,
1857+
since_token,
1858+
full_state=False,
1859+
)
1860+
1861+
# 1. Calculate `to_device` events
1862+
await self._generate_sync_entry_for_to_device(sync_result_builder)
1863+
1864+
# 2. Calculate `device_lists`
1865+
# Device list updates are sent if a since token is provided.
1866+
device_lists = DeviceListUpdates()
1867+
include_device_list_updates = bool(since_token and since_token.device_list_key)
1868+
if include_device_list_updates:
1869+
# Note that _generate_sync_entry_for_rooms sets sync_result_builder.joined, which
1870+
# is used in calculate_user_changes below.
1871+
#
1872+
# TODO: Running `_generate_sync_entry_for_rooms()` is a lot of work just to
1873+
# figure out the membership changes/derived info needed for
1874+
# `_generate_sync_entry_for_device_list()`. In the future, we should try to
1875+
# refactor this away.
1876+
(
1877+
newly_joined_rooms,
1878+
newly_left_rooms,
1879+
) = await self._generate_sync_entry_for_rooms(sync_result_builder)
1880+
1881+
# This uses the sync_result_builder.joined which is set in
1882+
# `_generate_sync_entry_for_rooms`, if that didn't find any joined
1883+
# rooms for some reason it is a no-op.
1884+
(
1885+
newly_joined_or_invited_or_knocked_users,
1886+
newly_left_users,
1887+
) = sync_result_builder.calculate_user_changes()
1888+
1889+
device_lists = await self._generate_sync_entry_for_device_list(
1890+
sync_result_builder,
1891+
newly_joined_rooms=newly_joined_rooms,
1892+
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
1893+
newly_left_rooms=newly_left_rooms,
1894+
newly_left_users=newly_left_users,
1895+
)
1896+
1897+
# 3. Calculate `device_one_time_keys_count` and `device_unused_fallback_key_types`
1898+
device_id = sync_config.device_id
1899+
one_time_keys_count: JsonMapping = {}
1900+
unused_fallback_key_types: List[str] = []
1901+
if device_id:
1902+
# TODO: We should have a way to let clients differentiate between the states of:
1903+
# * no change in OTK count since the provided since token
1904+
# * the server has zero OTKs left for this device
1905+
# Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
1906+
one_time_keys_count = await self.store.count_e2e_one_time_keys(
1907+
user_id, device_id
1908+
)
1909+
unused_fallback_key_types = list(
1910+
await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
1911+
)
1912+
1913+
return E2eeSyncResult(
1914+
to_device=sync_result_builder.to_device,
1915+
device_lists=device_lists,
1916+
device_one_time_keys_count=one_time_keys_count,
1917+
device_unused_fallback_key_types=unused_fallback_key_types,
1918+
next_batch=sync_result_builder.now_token,
1919+
)
1920+
16941921
async def get_sync_result_builder(
16951922
self,
16961923
sync_config: SyncConfig,
@@ -1889,7 +2116,7 @@ async def _generate_sync_entry_for_device_list(
18892116
users_that_have_changed = (
18902117
await self._device_handler.get_device_changes_in_shared_rooms(
18912118
user_id,
1892-
sync_result_builder.joined_room_ids,
2119+
joined_room_ids,
18932120
from_token=since_token,
18942121
now_token=sync_result_builder.now_token,
18952122
)

0 commit comments

Comments
 (0)