Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 29269d9

Browse files
Fix have_seen_event cache not being invalidated (#13863)
Fix #13856 Fix #13865 > Discovered while trying to make Synapse fast enough for [this MSC2716 test for importing many batches](matrix-org/complement#214 (comment)). As an example, disabling the `have_seen_event` cache saves 10 seconds for each `/messages` request in that MSC2716 Complement test because we're not making as many federation requests for `/state` (speeding up `have_seen_event` itself is related to #13625) > > But this will also make `/messages` faster in general so we can include it in the [faster `/messages` milestone](https://github.com/matrix-org/synapse/milestone/11). > > *-- #13856 ### The problem `_invalidate_caches_for_event` doesn't run in monolith mode which means we never even tried to clear the `have_seen_event` and other caches. And even in worker mode, it only runs on the workers, not the master (AFAICT). Additionally there was bug with the key being wrong so `_invalidate_caches_for_event` never invalidates the `have_seen_event` cache even when it does run. Because we were using the `@cachedList` wrong, it was putting items in the cache under keys like `((room_id, event_id),)` with a `set` in a `set` (ex. `(('!TnCIJPKzdQdUlIyXdQ:test', '$Iu0eqEBN7qcyF1S9B3oNB3I91v2o5YOgRNPwi_78s-k'),)`) and we we're trying to invalidate with just `(room_id, event_id)` which did nothing.
1 parent 35e9d6a commit 29269d9

File tree

5 files changed

+165
-67
lines changed

5 files changed

+165
-67
lines changed

changelog.d/13863.bugfix

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix `have_seen_event` cache not being invalidated after we persist an event which causes inefficiency effects like extra `/state` federation calls.

synapse/storage/databases/main/events_worker.py

+22-18
Original file line numberDiff line numberDiff line change
@@ -1474,32 +1474,38 @@ async def have_seen_events(
14741474
# the batches as big as possible.
14751475

14761476
results: Set[str] = set()
1477-
for chunk in batch_iter(event_ids, 500):
1478-
r = await self._have_seen_events_dict(
1479-
[(room_id, event_id) for event_id in chunk]
1477+
for event_ids_chunk in batch_iter(event_ids, 500):
1478+
events_seen_dict = await self._have_seen_events_dict(
1479+
room_id, event_ids_chunk
1480+
)
1481+
results.update(
1482+
eid for (eid, have_event) in events_seen_dict.items() if have_event
14801483
)
1481-
results.update(eid for ((_rid, eid), have_event) in r.items() if have_event)
14821484

14831485
return results
14841486

1485-
@cachedList(cached_method_name="have_seen_event", list_name="keys")
1487+
@cachedList(cached_method_name="have_seen_event", list_name="event_ids")
14861488
async def _have_seen_events_dict(
1487-
self, keys: Collection[Tuple[str, str]]
1488-
) -> Dict[Tuple[str, str], bool]:
1489+
self,
1490+
room_id: str,
1491+
event_ids: Collection[str],
1492+
) -> Dict[str, bool]:
14891493
"""Helper for have_seen_events
14901494
14911495
Returns:
1492-
a dict {(room_id, event_id)-> bool}
1496+
a dict {event_id -> bool}
14931497
"""
14941498
# if the event cache contains the event, obviously we've seen it.
14951499

14961500
cache_results = {
1497-
(rid, eid)
1498-
for (rid, eid) in keys
1499-
if await self._get_event_cache.contains((eid,))
1501+
event_id
1502+
for event_id in event_ids
1503+
if await self._get_event_cache.contains((event_id,))
15001504
}
15011505
results = dict.fromkeys(cache_results, True)
1502-
remaining = [k for k in keys if k not in cache_results]
1506+
remaining = [
1507+
event_id for event_id in event_ids if event_id not in cache_results
1508+
]
15031509
if not remaining:
15041510
return results
15051511

@@ -1511,23 +1517,21 @@ def have_seen_events_txn(txn: LoggingTransaction) -> None:
15111517

15121518
sql = "SELECT event_id FROM events AS e WHERE "
15131519
clause, args = make_in_list_sql_clause(
1514-
txn.database_engine, "e.event_id", [eid for (_rid, eid) in remaining]
1520+
txn.database_engine, "e.event_id", remaining
15151521
)
15161522
txn.execute(sql + clause, args)
15171523
found_events = {eid for eid, in txn}
15181524

15191525
# ... and then we can update the results for each key
1520-
results.update(
1521-
{(rid, eid): (eid in found_events) for (rid, eid) in remaining}
1522-
)
1526+
results.update({eid: (eid in found_events) for eid in remaining})
15231527

15241528
await self.db_pool.runInteraction("have_seen_events", have_seen_events_txn)
15251529
return results
15261530

15271531
@cached(max_entries=100000, tree=True)
15281532
async def have_seen_event(self, room_id: str, event_id: str) -> bool:
1529-
res = await self._have_seen_events_dict(((room_id, event_id),))
1530-
return res[(room_id, event_id)]
1533+
res = await self._have_seen_events_dict(room_id, [event_id])
1534+
return res[event_id]
15311535

15321536
def _get_current_state_event_counts_txn(
15331537
self, txn: LoggingTransaction, room_id: str

synapse/util/caches/descriptors.py

+6
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,12 @@ def __get__(
431431
cache: DeferredCache[CacheKey, Any] = cached_method.cache
432432
num_args = cached_method.num_args
433433

434+
if num_args != self.num_args:
435+
raise Exception(
436+
"Number of args (%s) does not match underlying cache_method_name=%s (%s)."
437+
% (self.num_args, self.cached_method_name, num_args)
438+
)
439+
434440
@functools.wraps(self.orig)
435441
def wrapped(*args: Any, **kwargs: Any) -> "defer.Deferred[Dict]":
436442
# If we're passed a cache_context then we'll want to call its

tests/storage/databases/main/test_events_worker.py

+104-48
Original file line numberDiff line numberDiff line change
@@ -35,66 +35,45 @@
3535
from synapse.util.async_helpers import yieldable_gather_results
3636

3737
from tests import unittest
38+
from tests.test_utils.event_injection import create_event, inject_event
3839

3940

4041
class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
42+
servlets = [
43+
admin.register_servlets,
44+
room.register_servlets,
45+
login.register_servlets,
46+
]
47+
4148
def prepare(self, reactor, clock, hs):
49+
self.hs = hs
4250
self.store: EventsWorkerStore = hs.get_datastores().main
4351

44-
# insert some test data
45-
for rid in ("room1", "room2"):
46-
self.get_success(
47-
self.store.db_pool.simple_insert(
48-
"rooms",
49-
{"room_id": rid, "room_version": 4},
50-
)
51-
)
52+
self.user = self.register_user("user", "pass")
53+
self.token = self.login(self.user, "pass")
54+
self.room_id = self.helper.create_room_as(self.user, tok=self.token)
5255

5356
self.event_ids: List[str] = []
54-
for idx, rid in enumerate(
55-
(
56-
"room1",
57-
"room1",
58-
"room1",
59-
"room2",
60-
)
61-
):
62-
event_json = {"type": f"test {idx}", "room_id": rid}
63-
event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
64-
event_id = event.event_id
65-
66-
self.get_success(
67-
self.store.db_pool.simple_insert(
68-
"events",
69-
{
70-
"event_id": event_id,
71-
"room_id": rid,
72-
"topological_ordering": idx,
73-
"stream_ordering": idx,
74-
"type": event.type,
75-
"processed": True,
76-
"outlier": False,
77-
},
57+
for i in range(3):
58+
event = self.get_success(
59+
inject_event(
60+
hs,
61+
room_version=RoomVersions.V7.identifier,
62+
room_id=self.room_id,
63+
sender=self.user,
64+
type="test_event_type",
65+
content={"body": f"foobarbaz{i}"},
7866
)
7967
)
80-
self.get_success(
81-
self.store.db_pool.simple_insert(
82-
"event_json",
83-
{
84-
"event_id": event_id,
85-
"room_id": rid,
86-
"json": json.dumps(event_json),
87-
"internal_metadata": "{}",
88-
"format_version": 3,
89-
},
90-
)
91-
)
92-
self.event_ids.append(event_id)
68+
69+
self.event_ids.append(event.event_id)
9370

9471
def test_simple(self):
9572
with LoggingContext(name="test") as ctx:
9673
res = self.get_success(
97-
self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
74+
self.store.have_seen_events(
75+
self.room_id, [self.event_ids[0], "eventdoesnotexist"]
76+
)
9877
)
9978
self.assertEqual(res, {self.event_ids[0]})
10079

@@ -104,7 +83,9 @@ def test_simple(self):
10483
# a second lookup of the same events should cause no queries
10584
with LoggingContext(name="test") as ctx:
10685
res = self.get_success(
107-
self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
86+
self.store.have_seen_events(
87+
self.room_id, [self.event_ids[0], "eventdoesnotexist"]
88+
)
10889
)
10990
self.assertEqual(res, {self.event_ids[0]})
11091
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
@@ -116,11 +97,86 @@ def test_query_via_event_cache(self):
11697
# looking it up should now cause no db hits
11798
with LoggingContext(name="test") as ctx:
11899
res = self.get_success(
119-
self.store.have_seen_events("room1", [self.event_ids[0]])
100+
self.store.have_seen_events(self.room_id, [self.event_ids[0]])
120101
)
121102
self.assertEqual(res, {self.event_ids[0]})
122103
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
123104

105+
def test_persisting_event_invalidates_cache(self):
106+
"""
107+
Test to make sure that the `have_seen_event` cache
108+
is invalidated after we persist an event and returns
109+
the updated value.
110+
"""
111+
event, event_context = self.get_success(
112+
create_event(
113+
self.hs,
114+
room_id=self.room_id,
115+
sender=self.user,
116+
type="test_event_type",
117+
content={"body": "garply"},
118+
)
119+
)
120+
121+
with LoggingContext(name="test") as ctx:
122+
# First, check `have_seen_event` for an event we have not seen yet
123+
# to prime the cache with a `false` value.
124+
res = self.get_success(
125+
self.store.have_seen_events(event.room_id, [event.event_id])
126+
)
127+
self.assertEqual(res, set())
128+
129+
# That should result in a single db query to lookup
130+
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
131+
132+
# Persist the event which should invalidate or prefill the
133+
# `have_seen_event` cache so we don't return stale values.
134+
persistence = self.hs.get_storage_controllers().persistence
135+
self.get_success(
136+
persistence.persist_event(
137+
event,
138+
event_context,
139+
)
140+
)
141+
142+
with LoggingContext(name="test") as ctx:
143+
# Check `have_seen_event` again and we should see the updated fact
144+
# that we have now seen the event after persisting it.
145+
res = self.get_success(
146+
self.store.have_seen_events(event.room_id, [event.event_id])
147+
)
148+
self.assertEqual(res, {event.event_id})
149+
150+
# That should result in a single db query to lookup
151+
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
152+
153+
def test_invalidate_cache_by_room_id(self):
154+
"""
155+
Test to make sure that all events associated with the given `(room_id,)`
156+
are invalidated in the `have_seen_event` cache.
157+
"""
158+
with LoggingContext(name="test") as ctx:
159+
# Prime the cache with some values
160+
res = self.get_success(
161+
self.store.have_seen_events(self.room_id, self.event_ids)
162+
)
163+
self.assertEqual(res, set(self.event_ids))
164+
165+
# That should result in a single db query to lookup
166+
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
167+
168+
# Clear the cache with any events associated with the `room_id`
169+
self.store.have_seen_event.invalidate((self.room_id,))
170+
171+
with LoggingContext(name="test") as ctx:
172+
res = self.get_success(
173+
self.store.have_seen_events(self.room_id, self.event_ids)
174+
)
175+
self.assertEqual(res, set(self.event_ids))
176+
177+
# Since we cleared the cache, it should result in another db query to lookup
178+
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
179+
124180

125181
class EventCacheTestCase(unittest.HomeserverTestCase):
126182
"""Test that the various layers of event cache works."""

tests/util/caches/test_descriptors.py

+32-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
import logging
16-
from typing import Set
16+
from typing import Iterable, Set, Tuple
1717
from unittest import mock
1818

1919
from twisted.internet import defer, reactor
@@ -1008,3 +1008,34 @@ async def do_lookup():
10081008
obj.inner_context_was_finished, "Tried to restart a finished logcontext"
10091009
)
10101010
self.assertEqual(current_context(), SENTINEL_CONTEXT)
1011+
1012+
def test_num_args_mismatch(self):
1013+
"""
1014+
Make sure someone does not accidentally use @cachedList on a method with
1015+
a mismatch in the number args to the underlying single cache method.
1016+
"""
1017+
1018+
class Cls:
1019+
@descriptors.cached(tree=True)
1020+
def fn(self, room_id, event_id):
1021+
pass
1022+
1023+
# This is wrong ❌. `@cachedList` expects to be given the same number
1024+
# of arguments as the underlying cached function, just with one of
1025+
# the arguments being an iterable
1026+
@descriptors.cachedList(cached_method_name="fn", list_name="keys")
1027+
def list_fn(self, keys: Iterable[Tuple[str, str]]):
1028+
pass
1029+
1030+
# Corrected syntax ✅
1031+
#
1032+
# @cachedList(cached_method_name="fn", list_name="event_ids")
1033+
# async def list_fn(
1034+
# self, room_id: str, event_ids: Collection[str],
1035+
# )
1036+
1037+
obj = Cls()
1038+
1039+
# Make sure this raises an error about the arg mismatch
1040+
with self.assertRaises(Exception):
1041+
obj.list_fn([("foo", "bar")])

0 commit comments

Comments
 (0)