Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 22abfca

Browse files
Fix a bug introduced in Synapse v1.0.0 whereby device list updates would not be sent to remote homeservers if there were too many to send at once. (#11729)
Co-authored-by: Brendan Abolivier <[email protected]>
1 parent 1b1aed3 commit 22abfca

File tree

3 files changed

+57
-2
lines changed

3 files changed

+57
-2
lines changed

changelog.d/11729.bugfix

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a bug introduced in Synapse v1.0.0 whereby some device list updates would not be sent to remote homeservers if there were too many to send at once.

synapse/storage/databases/main/devices.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,10 @@ async def get_device_updates_by_remote(
270270
# The most recent request's opentracing_context is used as the
271271
# context which created the Edu.
272272

273+
# This is the stream ID that we will return for the consumer to resume
274+
# following this stream later.
275+
last_processed_stream_id = from_stream_id
276+
273277
query_map = {}
274278
cross_signing_keys_by_user = {}
275279
for user_id, device_id, update_stream_id, update_context in updates:
@@ -295,6 +299,8 @@ async def get_device_updates_by_remote(
295299
if update_stream_id > previous_update_stream_id:
296300
query_map[key] = (update_stream_id, update_context)
297301

302+
last_processed_stream_id = update_stream_id
303+
298304
results = await self._get_device_update_edus_by_remote(
299305
destination, from_stream_id, query_map
300306
)
@@ -307,7 +313,7 @@ async def get_device_updates_by_remote(
307313
# FIXME: remove this when enough servers have upgraded
308314
results.append(("org.matrix.signing_key_update", result))
309315

310-
return now_stream_id, results
316+
return last_processed_stream_id, results
311317

312318
def _get_device_updates_by_remote_txn(
313319
self,

tests/storage/test_devices.py

+49-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def test_count_devices_by_users(self):
9494
def test_get_device_updates_by_remote(self):
9595
device_ids = ["device_id1", "device_id2"]
9696

97-
# Add two device updates with a single stream_id
97+
# Add two device updates with sequential `stream_id`s
9898
self.get_success(
9999
self.store.add_device_change_to_streams("user_id", device_ids, ["somehost"])
100100
)
@@ -107,6 +107,54 @@ def test_get_device_updates_by_remote(self):
107107
# Check original device_ids are contained within these updates
108108
self._check_devices_in_updates(device_ids, device_updates)
109109

110+
def test_get_device_updates_by_remote_can_limit_properly(self):
111+
"""
112+
Tests that `get_device_updates_by_remote` returns an appropriate
113+
stream_id to resume fetching from (without skipping any results).
114+
"""
115+
116+
# Add some device updates with sequential `stream_id`s
117+
device_ids = [
118+
"device_id1",
119+
"device_id2",
120+
"device_id3",
121+
"device_id4",
122+
"device_id5",
123+
]
124+
self.get_success(
125+
self.store.add_device_change_to_streams("user_id", device_ids, ["somehost"])
126+
)
127+
128+
# Get all device updates ever meant for this remote
129+
next_stream_id, device_updates = self.get_success(
130+
self.store.get_device_updates_by_remote("somehost", -1, limit=3)
131+
)
132+
133+
# Check the first three original device_ids are contained within these updates
134+
self._check_devices_in_updates(device_ids[:3], device_updates)
135+
136+
# Get the next batch of device updates
137+
next_stream_id, device_updates = self.get_success(
138+
self.store.get_device_updates_by_remote("somehost", next_stream_id, limit=3)
139+
)
140+
141+
# Check the last two original device_ids are contained within these updates
142+
self._check_devices_in_updates(device_ids[3:], device_updates)
143+
144+
# Add some more device updates to ensure it still resumes properly
145+
device_ids = ["device_id6", "device_id7"]
146+
self.get_success(
147+
self.store.add_device_change_to_streams("user_id", device_ids, ["somehost"])
148+
)
149+
150+
# Get the next batch of device updates
151+
next_stream_id, device_updates = self.get_success(
152+
self.store.get_device_updates_by_remote("somehost", next_stream_id, limit=3)
153+
)
154+
155+
# Check the newly-added device_ids are contained within these updates
156+
self._check_devices_in_updates(device_ids, device_updates)
157+
110158
def _check_devices_in_updates(self, expected_device_ids, device_updates):
111159
"""Check that an specific device ids exist in a list of device update EDUs"""
112160
self.assertEqual(len(device_updates), len(expected_device_ids))

0 commit comments

Comments
 (0)