Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 9b43df1

Browse files
authored
Optimise _get_state_after_missing_prev_event: use /state (#12040)
If we're missing most of the events in the room state, then we may as well call the /state endpoint, instead of individually requesting each and every event.
1 parent e440930 commit 9b43df1

File tree

4 files changed

+268
-9
lines changed

4 files changed

+268
-9
lines changed

changelog.d/12040.feature

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Optimise fetching large quantities of missing room state over federation.

synapse/handlers/federation_event.py

+39-4
Original file line numberDiff line numberDiff line change
@@ -897,10 +897,24 @@ async def _get_state_after_missing_prev_event(
897897
logger.debug("We are also missing %i auth events", len(missing_auth_events))
898898

899899
missing_events = missing_desired_events | missing_auth_events
900-
logger.debug("Fetching %i events from remote", len(missing_events))
901-
await self._get_events_and_persist(
902-
destination=destination, room_id=room_id, event_ids=missing_events
903-
)
900+
901+
# Making an individual request for each of 1000s of events has a lot of
902+
# overhead. On the other hand, we don't really want to fetch all of the events
903+
# if we already have most of them.
904+
#
905+
# As an arbitrary heuristic, if we are missing more than 10% of the events, then
906+
# we fetch the whole state.
907+
#
908+
# TODO: might it be better to have an API which lets us do an aggregate event
909+
# request
910+
if (len(missing_events) * 10) >= len(auth_event_ids) + len(state_event_ids):
911+
logger.debug("Requesting complete state from remote")
912+
await self._get_state_and_persist(destination, room_id, event_id)
913+
else:
914+
logger.debug("Fetching %i events from remote", len(missing_events))
915+
await self._get_events_and_persist(
916+
destination=destination, room_id=room_id, event_ids=missing_events
917+
)
904918

905919
# we need to make sure we re-load from the database to get the rejected
906920
# state correct.
@@ -959,6 +973,27 @@ async def _get_state_after_missing_prev_event(
959973

960974
return remote_state
961975

976+
async def _get_state_and_persist(
977+
self, destination: str, room_id: str, event_id: str
978+
) -> None:
979+
"""Get the complete room state at a given event, and persist any new events
980+
as outliers"""
981+
room_version = await self._store.get_room_version(room_id)
982+
auth_events, state_events = await self._federation_client.get_room_state(
983+
destination, room_id, event_id=event_id, room_version=room_version
984+
)
985+
logger.info("/state returned %i events", len(auth_events) + len(state_events))
986+
987+
await self._auth_and_persist_outliers(
988+
room_id, itertools.chain(auth_events, state_events)
989+
)
990+
991+
# we also need the event itself.
992+
if not await self._store.have_seen_event(room_id, event_id):
993+
await self._get_events_and_persist(
994+
destination=destination, room_id=room_id, event_ids=(event_id,)
995+
)
996+
962997
async def _process_received_pdu(
963998
self,
964999
origin: str,

synapse/storage/databases/main/events_worker.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
Dict,
2323
Iterable,
2424
List,
25-
NoReturn,
2625
Optional,
2726
Set,
2827
Tuple,
@@ -1330,10 +1329,9 @@ def have_seen_events_txn(
13301329
return results
13311330

13321331
@cached(max_entries=100000, tree=True)
1333-
async def have_seen_event(self, room_id: str, event_id: str) -> NoReturn:
1334-
# this only exists for the benefit of the @cachedList descriptor on
1335-
# _have_seen_events_dict
1336-
raise NotImplementedError()
1332+
async def have_seen_event(self, room_id: str, event_id: str) -> bool:
1333+
res = await self._have_seen_events_dict(((room_id, event_id),))
1334+
return res[(room_id, event_id)]
13371335

13381336
def _get_current_state_event_counts_txn(
13391337
self, txn: LoggingTransaction, room_id: str
+225
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
# Copyright 2022 The Matrix.org Foundation C.I.C.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from unittest import mock
15+
16+
from synapse.events import make_event_from_dict
17+
from synapse.events.snapshot import EventContext
18+
from synapse.federation.transport.client import StateRequestResponse
19+
from synapse.logging.context import LoggingContext
20+
from synapse.rest import admin
21+
from synapse.rest.client import login, room
22+
23+
from tests import unittest
24+
from tests.test_utils import event_injection, make_awaitable
25+
26+
27+
class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
28+
servlets = [
29+
admin.register_servlets,
30+
login.register_servlets,
31+
room.register_servlets,
32+
]
33+
34+
def make_homeserver(self, reactor, clock):
35+
# mock out the federation transport client
36+
self.mock_federation_transport_client = mock.Mock(
37+
spec=["get_room_state_ids", "get_room_state", "get_event"]
38+
)
39+
return super().setup_test_homeserver(
40+
federation_transport_client=self.mock_federation_transport_client
41+
)
42+
43+
def test_process_pulled_event_with_missing_state(self) -> None:
44+
"""Ensure that we correctly handle pulled events with lots of missing state
45+
46+
In this test, we pretend we are processing a "pulled" event (eg, via backfill
47+
or get_missing_events). The pulled event has a prev_event we haven't previously
48+
seen, so the server requests the state at that prev_event. There is a lot
49+
of state we don't have, so we expect the server to make a /state request.
50+
51+
We check that the pulled event is correctly persisted, and that the state is
52+
as we expect.
53+
"""
54+
return self._test_process_pulled_event_with_missing_state(False)
55+
56+
def test_process_pulled_event_with_missing_state_where_prev_is_outlier(
57+
self,
58+
) -> None:
59+
"""Ensure that we correctly handle pulled events with lots of missing state
60+
61+
A slight modification to test_process_pulled_event_with_missing_state. Again
62+
we have a "pulled" event which refers to a prev_event with lots of state,
63+
but in this case we already have the prev_event (as an outlier, obviously -
64+
if it were a regular event, we wouldn't need to request the state).
65+
"""
66+
return self._test_process_pulled_event_with_missing_state(True)
67+
68+
def _test_process_pulled_event_with_missing_state(
69+
self, prev_exists_as_outlier: bool
70+
) -> None:
71+
OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
72+
main_store = self.hs.get_datastores().main
73+
state_storage = self.hs.get_storage().state
74+
75+
# create the room
76+
user_id = self.register_user("kermit", "test")
77+
tok = self.login("kermit", "test")
78+
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
79+
room_version = self.get_success(main_store.get_room_version(room_id))
80+
81+
# allow the remote user to send state events
82+
self.helper.send_state(
83+
room_id,
84+
"m.room.power_levels",
85+
{"events_default": 0, "state_default": 0},
86+
tok=tok,
87+
)
88+
89+
# add the remote user to the room
90+
member_event = self.get_success(
91+
event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
92+
)
93+
94+
initial_state_map = self.get_success(main_store.get_current_state_ids(room_id))
95+
96+
auth_event_ids = [
97+
initial_state_map[("m.room.create", "")],
98+
initial_state_map[("m.room.power_levels", "")],
99+
initial_state_map[("m.room.join_rules", "")],
100+
member_event.event_id,
101+
]
102+
103+
# mock up a load of state events which we are missing
104+
state_events = [
105+
make_event_from_dict(
106+
self.add_hashes_and_signatures(
107+
{
108+
"type": "test_state_type",
109+
"state_key": f"state_{i}",
110+
"room_id": room_id,
111+
"sender": OTHER_USER,
112+
"prev_events": [member_event.event_id],
113+
"auth_events": auth_event_ids,
114+
"origin_server_ts": 1,
115+
"depth": 10,
116+
"content": {"body": f"state_{i}"},
117+
}
118+
),
119+
room_version,
120+
)
121+
for i in range(1, 10)
122+
]
123+
124+
# this is the state that we are going to claim is active at the prev_event.
125+
state_at_prev_event = state_events + self.get_success(
126+
main_store.get_events_as_list(initial_state_map.values())
127+
)
128+
129+
# mock up a prev event.
130+
# Depending on the test, we either persist this upfront (as an outlier),
131+
# or let the server request it.
132+
prev_event = make_event_from_dict(
133+
self.add_hashes_and_signatures(
134+
{
135+
"type": "test_regular_type",
136+
"room_id": room_id,
137+
"sender": OTHER_USER,
138+
"prev_events": [],
139+
"auth_events": auth_event_ids,
140+
"origin_server_ts": 1,
141+
"depth": 11,
142+
"content": {"body": "missing_prev"},
143+
}
144+
),
145+
room_version,
146+
)
147+
if prev_exists_as_outlier:
148+
prev_event.internal_metadata.outlier = True
149+
persistence = self.hs.get_storage().persistence
150+
self.get_success(
151+
persistence.persist_event(prev_event, EventContext.for_outlier())
152+
)
153+
else:
154+
155+
async def get_event(destination: str, event_id: str, timeout=None):
156+
self.assertEqual(destination, self.OTHER_SERVER_NAME)
157+
self.assertEqual(event_id, prev_event.event_id)
158+
return {"pdus": [prev_event.get_pdu_json()]}
159+
160+
self.mock_federation_transport_client.get_event.side_effect = get_event
161+
162+
# mock up a regular event to pass into _process_pulled_event
163+
pulled_event = make_event_from_dict(
164+
self.add_hashes_and_signatures(
165+
{
166+
"type": "test_regular_type",
167+
"room_id": room_id,
168+
"sender": OTHER_USER,
169+
"prev_events": [prev_event.event_id],
170+
"auth_events": auth_event_ids,
171+
"origin_server_ts": 1,
172+
"depth": 12,
173+
"content": {"body": "pulled"},
174+
}
175+
),
176+
room_version,
177+
)
178+
179+
# we expect an outbound request to /state_ids, so stub that out
180+
self.mock_federation_transport_client.get_room_state_ids.return_value = (
181+
make_awaitable(
182+
{
183+
"pdu_ids": [e.event_id for e in state_at_prev_event],
184+
"auth_chain_ids": [],
185+
}
186+
)
187+
)
188+
189+
# we also expect an outbound request to /state
190+
self.mock_federation_transport_client.get_room_state.return_value = (
191+
make_awaitable(
192+
StateRequestResponse(auth_events=[], state=state_at_prev_event)
193+
)
194+
)
195+
196+
# we have to bump the clock a bit, to keep the retry logic in
197+
# FederationClient.get_pdu happy
198+
self.reactor.advance(60000)
199+
200+
# Finally, the call under test: send the pulled event into _process_pulled_event
201+
with LoggingContext("test"):
202+
self.get_success(
203+
self.hs.get_federation_event_handler()._process_pulled_event(
204+
self.OTHER_SERVER_NAME, pulled_event, backfilled=False
205+
)
206+
)
207+
208+
# check that the event is correctly persisted
209+
persisted = self.get_success(main_store.get_event(pulled_event.event_id))
210+
self.assertIsNotNone(persisted, "pulled event was not persisted at all")
211+
self.assertFalse(
212+
persisted.internal_metadata.is_outlier(), "pulled event was an outlier"
213+
)
214+
215+
# check that the state at that event is as expected
216+
state = self.get_success(
217+
state_storage.get_state_ids_for_event(pulled_event.event_id)
218+
)
219+
expected_state = {
220+
(e.type, e.state_key): e.event_id for e in state_at_prev_event
221+
}
222+
self.assertEqual(state, expected_state)
223+
224+
if prev_exists_as_outlier:
225+
self.mock_federation_transport_client.get_event.assert_not_called()

0 commit comments

Comments
 (0)