Skip to content

Commit 641b6b2

Browse files
erikjohnstonH-Shay
authored andcommitted
Replaces all usages of StreamIdGenerator with MultiWriterIdGenerator (element-hq#17229)
Replaces all usages of `StreamIdGenerator` with `MultiWriterIdGenerator`, which is safer.
1 parent 8e0e1d5 commit 641b6b2

File tree

10 files changed

+227
-363
lines changed

10 files changed

+227
-363
lines changed

changelog.d/17229.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Replaces all usages of `StreamIdGenerator` with `MultiWriterIdGenerator`.

synapse/_scripts/synapse_port_db.py

+61-10
Original file line numberDiff line numberDiff line change
@@ -777,22 +777,74 @@ def alter_table(txn: LoggingTransaction) -> None:
777777
await self._setup_events_stream_seqs()
778778
await self._setup_sequence(
779779
"un_partial_stated_event_stream_sequence",
780-
("un_partial_stated_event_stream",),
780+
[("un_partial_stated_event_stream", "stream_id")],
781781
)
782782
await self._setup_sequence(
783-
"device_inbox_sequence", ("device_inbox", "device_federation_outbox")
783+
"device_inbox_sequence",
784+
[
785+
("device_inbox", "stream_id"),
786+
("device_federation_outbox", "stream_id"),
787+
],
784788
)
785789
await self._setup_sequence(
786790
"account_data_sequence",
787-
("room_account_data", "room_tags_revisions", "account_data"),
791+
[
792+
("room_account_data", "stream_id"),
793+
("room_tags_revisions", "stream_id"),
794+
("account_data", "stream_id"),
795+
],
796+
)
797+
await self._setup_sequence(
798+
"receipts_sequence",
799+
[
800+
("receipts_linearized", "stream_id"),
801+
],
802+
)
803+
await self._setup_sequence(
804+
"presence_stream_sequence",
805+
[
806+
("presence_stream", "stream_id"),
807+
],
788808
)
789-
await self._setup_sequence("receipts_sequence", ("receipts_linearized",))
790-
await self._setup_sequence("presence_stream_sequence", ("presence_stream",))
791809
await self._setup_auth_chain_sequence()
792810
await self._setup_sequence(
793811
"application_services_txn_id_seq",
794-
("application_services_txns",),
795-
"txn_id",
812+
[
813+
(
814+
"application_services_txns",
815+
"txn_id",
816+
)
817+
],
818+
)
819+
await self._setup_sequence(
820+
"device_lists_sequence",
821+
[
822+
("device_lists_stream", "stream_id"),
823+
("user_signature_stream", "stream_id"),
824+
("device_lists_outbound_pokes", "stream_id"),
825+
("device_lists_changes_in_room", "stream_id"),
826+
("device_lists_remote_pending", "stream_id"),
827+
("device_lists_changes_converted_stream_position", "stream_id"),
828+
],
829+
)
830+
await self._setup_sequence(
831+
"e2e_cross_signing_keys_sequence",
832+
[
833+
("e2e_cross_signing_keys", "stream_id"),
834+
],
835+
)
836+
await self._setup_sequence(
837+
"push_rules_stream_sequence",
838+
[
839+
("push_rules_stream", "stream_id"),
840+
],
841+
)
842+
await self._setup_sequence(
843+
"pushers_sequence",
844+
[
845+
("pushers", "id"),
846+
("deleted_pushers", "stream_id"),
847+
],
796848
)
797849

798850
# Step 3. Get tables.
@@ -1101,12 +1153,11 @@ def _setup_events_stream_seqs_set_pos(txn: LoggingTransaction) -> None:
11011153
async def _setup_sequence(
11021154
self,
11031155
sequence_name: str,
1104-
stream_id_tables: Iterable[str],
1105-
column_name: str = "stream_id",
1156+
stream_id_tables: Iterable[Tuple[str, str]],
11061157
) -> None:
11071158
"""Set a sequence to the correct value."""
11081159
current_stream_ids = []
1109-
for stream_id_table in stream_id_tables:
1160+
for stream_id_table, column_name in stream_id_tables:
11101161
max_stream_id = cast(
11111162
int,
11121163
await self.sqlite_store.db_pool.simple_select_one_onecol(

synapse/storage/databases/main/devices.py

+30-24
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,7 @@
5757
from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore
5858
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
5959
from synapse.storage.types import Cursor
60-
from synapse.storage.util.id_generators import (
61-
AbstractStreamIdGenerator,
62-
StreamIdGenerator,
63-
)
60+
from synapse.storage.util.id_generators import MultiWriterIdGenerator
6461
from synapse.types import (
6562
JsonDict,
6663
JsonMapping,
@@ -99,19 +96,21 @@ def __init__(
9996

10097
# In the worker store this is an ID tracker which we overwrite in the non-worker
10198
# class below that is used on the main process.
102-
self._device_list_id_gen = StreamIdGenerator(
103-
db_conn,
104-
hs.get_replication_notifier(),
105-
"device_lists_stream",
106-
"stream_id",
107-
extra_tables=[
108-
("user_signature_stream", "stream_id"),
109-
("device_lists_outbound_pokes", "stream_id"),
110-
("device_lists_changes_in_room", "stream_id"),
111-
("device_lists_remote_pending", "stream_id"),
112-
("device_lists_changes_converted_stream_position", "stream_id"),
99+
self._device_list_id_gen = MultiWriterIdGenerator(
100+
db_conn=db_conn,
101+
db=database,
102+
notifier=hs.get_replication_notifier(),
103+
stream_name="device_lists_stream",
104+
instance_name=self._instance_name,
105+
tables=[
106+
("device_lists_stream", "instance_name", "stream_id"),
107+
("user_signature_stream", "instance_name", "stream_id"),
108+
("device_lists_outbound_pokes", "instance_name", "stream_id"),
109+
("device_lists_changes_in_room", "instance_name", "stream_id"),
110+
("device_lists_remote_pending", "instance_name", "stream_id"),
113111
],
114-
is_writer=hs.config.worker.worker_app is None,
112+
sequence_name="device_lists_sequence",
113+
writers=["master"],
115114
)
116115

117116
device_list_max = self._device_list_id_gen.get_current_token()
@@ -762,6 +761,7 @@ def _add_user_signature_change_txn(
762761
"stream_id": stream_id,
763762
"from_user_id": from_user_id,
764763
"user_ids": json_encoder.encode(user_ids),
764+
"instance_name": self._instance_name,
765765
},
766766
)
767767

@@ -1582,6 +1582,8 @@ def __init__(
15821582
):
15831583
super().__init__(database, db_conn, hs)
15841584

1585+
self._instance_name = hs.get_instance_name()
1586+
15851587
self.db_pool.updates.register_background_index_update(
15861588
"device_lists_stream_idx",
15871589
index_name="device_lists_stream_user_id",
@@ -1694,6 +1696,7 @@ def _txn(txn: LoggingTransaction) -> int:
16941696
"device_lists_outbound_pokes",
16951697
{
16961698
"stream_id": stream_id,
1699+
"instance_name": self._instance_name,
16971700
"destination": destination,
16981701
"user_id": user_id,
16991702
"device_id": device_id,
@@ -1730,10 +1733,6 @@ def _txn(txn: LoggingTransaction) -> int:
17301733

17311734

17321735
class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
1733-
# Because we have write access, this will be a StreamIdGenerator
1734-
# (see DeviceWorkerStore.__init__)
1735-
_device_list_id_gen: AbstractStreamIdGenerator
1736-
17371736
def __init__(
17381737
self,
17391738
database: DatabasePool,
@@ -2092,9 +2091,9 @@ def _add_device_change_to_stream_txn(
20922091
self.db_pool.simple_insert_many_txn(
20932092
txn,
20942093
table="device_lists_stream",
2095-
keys=("stream_id", "user_id", "device_id"),
2094+
keys=("instance_name", "stream_id", "user_id", "device_id"),
20962095
values=[
2097-
(stream_id, user_id, device_id)
2096+
(self._instance_name, stream_id, user_id, device_id)
20982097
for stream_id, device_id in zip(stream_ids, device_ids)
20992098
],
21002099
)
@@ -2124,6 +2123,7 @@ def _add_device_outbound_poke_to_stream_txn(
21242123
values = [
21252124
(
21262125
destination,
2126+
self._instance_name,
21272127
next(stream_id_iterator),
21282128
user_id,
21292129
device_id,
@@ -2139,6 +2139,7 @@ def _add_device_outbound_poke_to_stream_txn(
21392139
table="device_lists_outbound_pokes",
21402140
keys=(
21412141
"destination",
2142+
"instance_name",
21422143
"stream_id",
21432144
"user_id",
21442145
"device_id",
@@ -2157,7 +2158,7 @@ def _add_device_outbound_poke_to_stream_txn(
21572158
device_id,
21582159
{
21592160
stream_id: destination
2160-
for (destination, stream_id, _, _, _, _, _) in values
2161+
for (destination, _, stream_id, _, _, _, _, _) in values
21612162
},
21622163
)
21632164

@@ -2210,6 +2211,7 @@ def _add_device_outbound_room_poke_txn(
22102211
"device_id",
22112212
"room_id",
22122213
"stream_id",
2214+
"instance_name",
22132215
"converted_to_destinations",
22142216
"opentracing_context",
22152217
),
@@ -2219,6 +2221,7 @@ def _add_device_outbound_room_poke_txn(
22192221
device_id,
22202222
room_id,
22212223
stream_id,
2224+
self._instance_name,
22222225
# We only need to calculate outbound pokes for local users
22232226
not self.hs.is_mine_id(user_id),
22242227
encoded_context,
@@ -2338,7 +2341,10 @@ async def add_remote_device_list_to_pending(
23382341
"user_id": user_id,
23392342
"device_id": device_id,
23402343
},
2341-
values={"stream_id": stream_id},
2344+
values={
2345+
"stream_id": stream_id,
2346+
"instance_name": self._instance_name,
2347+
},
23422348
desc="add_remote_device_list_to_pending",
23432349
)
23442350

synapse/storage/databases/main/end_to_end_keys.py

+13-6
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
)
5959
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
6060
from synapse.storage.engines import PostgresEngine
61-
from synapse.storage.util.id_generators import StreamIdGenerator
61+
from synapse.storage.util.id_generators import MultiWriterIdGenerator
6262
from synapse.types import JsonDict, JsonMapping
6363
from synapse.util import json_decoder, json_encoder
6464
from synapse.util.caches.descriptors import cached, cachedList
@@ -1448,11 +1448,17 @@ def __init__(
14481448
):
14491449
super().__init__(database, db_conn, hs)
14501450

1451-
self._cross_signing_id_gen = StreamIdGenerator(
1452-
db_conn,
1453-
hs.get_replication_notifier(),
1454-
"e2e_cross_signing_keys",
1455-
"stream_id",
1451+
self._cross_signing_id_gen = MultiWriterIdGenerator(
1452+
db_conn=db_conn,
1453+
db=database,
1454+
notifier=hs.get_replication_notifier(),
1455+
stream_name="e2e_cross_signing_keys",
1456+
instance_name=self._instance_name,
1457+
tables=[
1458+
("e2e_cross_signing_keys", "instance_name", "stream_id"),
1459+
],
1460+
sequence_name="e2e_cross_signing_keys_sequence",
1461+
writers=["master"],
14561462
)
14571463

14581464
async def set_e2e_device_keys(
@@ -1627,6 +1633,7 @@ def _set_e2e_cross_signing_key_txn(
16271633
"keytype": key_type,
16281634
"keydata": json_encoder.encode(key),
16291635
"stream_id": stream_id,
1636+
"instance_name": self._instance_name,
16301637
},
16311638
)
16321639

synapse/storage/databases/main/push_rule.py

+14-10
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
5454
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
5555
from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
56-
from synapse.storage.util.id_generators import IdGenerator, StreamIdGenerator
56+
from synapse.storage.util.id_generators import IdGenerator, MultiWriterIdGenerator
5757
from synapse.synapse_rust.push import FilteredPushRules, PushRule, PushRules
5858
from synapse.types import JsonDict
5959
from synapse.util import json_encoder, unwrapFirstError
@@ -126,7 +126,7 @@ class PushRulesWorkerStore(
126126
`get_max_push_rules_stream_id` which can be called in the initializer.
127127
"""
128128

129-
_push_rules_stream_id_gen: StreamIdGenerator
129+
_push_rules_stream_id_gen: MultiWriterIdGenerator
130130

131131
def __init__(
132132
self,
@@ -140,14 +140,17 @@ def __init__(
140140
hs.get_instance_name() in hs.config.worker.writers.push_rules
141141
)
142142

143-
# In the worker store this is an ID tracker which we overwrite in the non-worker
144-
# class below that is used on the main process.
145-
self._push_rules_stream_id_gen = StreamIdGenerator(
146-
db_conn,
147-
hs.get_replication_notifier(),
148-
"push_rules_stream",
149-
"stream_id",
150-
is_writer=self._is_push_writer,
143+
self._push_rules_stream_id_gen = MultiWriterIdGenerator(
144+
db_conn=db_conn,
145+
db=database,
146+
notifier=hs.get_replication_notifier(),
147+
stream_name="push_rules_stream",
148+
instance_name=self._instance_name,
149+
tables=[
150+
("push_rules_stream", "instance_name", "stream_id"),
151+
],
152+
sequence_name="push_rules_stream_sequence",
153+
writers=hs.config.worker.writers.push_rules,
151154
)
152155

153156
push_rules_prefill, push_rules_id = self.db_pool.get_cache_dict(
@@ -880,6 +883,7 @@ def _insert_push_rules_update_txn(
880883
raise Exception("Not a push writer")
881884

882885
values = {
886+
"instance_name": self._instance_name,
883887
"stream_id": stream_id,
884888
"event_stream_ordering": event_stream_ordering,
885889
"user_id": user_id,

0 commit comments

Comments
 (0)