Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit afefc55

Browse files
authored
Revert "Remove slaved id tracker (#14376)"
This reverts commit 36097e8.
1 parent 945a092 commit afefc55

File tree

11 files changed

+176
-74
lines changed

11 files changed

+176
-74
lines changed

changelog.d/14376.misc

-1
This file was deleted.

synapse/replication/slave/__init__.py

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2016 OpenMarket Ltd
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2016 OpenMarket Ltd
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Copyright 2016 OpenMarket Ltd
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from typing import List, Optional, Tuple
15+
16+
from synapse.storage.database import LoggingDatabaseConnection
17+
from synapse.storage.util.id_generators import AbstractStreamIdTracker, _load_current_id
18+
19+
20+
class SlavedIdTracker(AbstractStreamIdTracker):
21+
"""Tracks the "current" stream ID of a stream with a single writer.
22+
23+
See `AbstractStreamIdTracker` for more details.
24+
25+
Note that this class does not work correctly when there are multiple
26+
writers.
27+
"""
28+
29+
def __init__(
30+
self,
31+
db_conn: LoggingDatabaseConnection,
32+
table: str,
33+
column: str,
34+
extra_tables: Optional[List[Tuple[str, str]]] = None,
35+
step: int = 1,
36+
):
37+
self.step = step
38+
self._current = _load_current_id(db_conn, table, column, step)
39+
if extra_tables:
40+
for table, column in extra_tables:
41+
self.advance(None, _load_current_id(db_conn, table, column))
42+
43+
def advance(self, instance_name: Optional[str], new_id: int) -> None:
44+
self._current = (max if self.step > 0 else min)(self._current, new_id)
45+
46+
def get_current_token(self) -> int:
47+
return self._current
48+
49+
def get_current_token_for_writer(self, instance_name: str) -> int:
50+
return self.get_current_token()

synapse/storage/databases/main/account_data.py

+20-10
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
)
2828

2929
from synapse.api.constants import AccountDataTypes
30+
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
3031
from synapse.replication.tcp.streams import AccountDataStream, TagAccountDataStream
3132
from synapse.storage._base import db_to_json
3233
from synapse.storage.database import (
@@ -67,11 +68,12 @@ def __init__(
6768
# to write account data. A value of `True` implies that `_account_data_id_gen`
6869
# is an `AbstractStreamIdGenerator` and not just a tracker.
6970
self._account_data_id_gen: AbstractStreamIdTracker
70-
self._can_write_to_account_data = (
71-
self._instance_name in hs.config.worker.writers.account_data
72-
)
7371

7472
if isinstance(database.engine, PostgresEngine):
73+
self._can_write_to_account_data = (
74+
self._instance_name in hs.config.worker.writers.account_data
75+
)
76+
7577
self._account_data_id_gen = MultiWriterIdGenerator(
7678
db_conn=db_conn,
7779
db=database,
@@ -93,13 +95,21 @@ def __init__(
9395
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
9496
# updated over replication. (Multiple writers are not supported for
9597
# SQLite).
96-
self._account_data_id_gen = StreamIdGenerator(
97-
db_conn,
98-
"room_account_data",
99-
"stream_id",
100-
extra_tables=[("room_tags_revisions", "stream_id")],
101-
is_writer=self._instance_name in hs.config.worker.writers.account_data,
102-
)
98+
if self._instance_name in hs.config.worker.writers.account_data:
99+
self._can_write_to_account_data = True
100+
self._account_data_id_gen = StreamIdGenerator(
101+
db_conn,
102+
"room_account_data",
103+
"stream_id",
104+
extra_tables=[("room_tags_revisions", "stream_id")],
105+
)
106+
else:
107+
self._account_data_id_gen = SlavedIdTracker(
108+
db_conn,
109+
"room_account_data",
110+
"stream_id",
111+
extra_tables=[("room_tags_revisions", "stream_id")],
112+
)
103113

104114
account_max = self.get_max_account_data_stream_id()
105115
self._account_data_stream_cache = StreamChangeCache(

synapse/storage/databases/main/devices.py

+23-13
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
whitelisted_homeserver,
3939
)
4040
from synapse.metrics.background_process_metrics import wrap_as_background_process
41+
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
4142
from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
4243
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
4344
from synapse.storage.database import (
@@ -85,19 +86,28 @@ def __init__(
8586
):
8687
super().__init__(database, db_conn, hs)
8788

88-
# In the worker store this is an ID tracker which we overwrite in the non-worker
89-
# class below that is used on the main process.
90-
self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
91-
db_conn,
92-
"device_lists_stream",
93-
"stream_id",
94-
extra_tables=[
95-
("user_signature_stream", "stream_id"),
96-
("device_lists_outbound_pokes", "stream_id"),
97-
("device_lists_changes_in_room", "stream_id"),
98-
],
99-
is_writer=hs.config.worker.worker_app is None,
100-
)
89+
if hs.config.worker.worker_app is None:
90+
self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
91+
db_conn,
92+
"device_lists_stream",
93+
"stream_id",
94+
extra_tables=[
95+
("user_signature_stream", "stream_id"),
96+
("device_lists_outbound_pokes", "stream_id"),
97+
("device_lists_changes_in_room", "stream_id"),
98+
],
99+
)
100+
else:
101+
self._device_list_id_gen = SlavedIdTracker(
102+
db_conn,
103+
"device_lists_stream",
104+
"stream_id",
105+
extra_tables=[
106+
("user_signature_stream", "stream_id"),
107+
("device_lists_outbound_pokes", "stream_id"),
108+
("device_lists_changes_in_room", "stream_id"),
109+
],
110+
)
101111

102112
# Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a
103113
# StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker).

synapse/storage/databases/main/events_worker.py

+21-14
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
run_as_background_process,
6060
wrap_as_background_process,
6161
)
62+
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
6263
from synapse.replication.tcp.streams import BackfillStream
6364
from synapse.replication.tcp.streams.events import EventsStream
6465
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -212,20 +213,26 @@ def __init__(
212213
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
213214
# updated over replication. (Multiple writers are not supported for
214215
# SQLite).
215-
self._stream_id_gen = StreamIdGenerator(
216-
db_conn,
217-
"events",
218-
"stream_ordering",
219-
is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
220-
)
221-
self._backfill_id_gen = StreamIdGenerator(
222-
db_conn,
223-
"events",
224-
"stream_ordering",
225-
step=-1,
226-
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
227-
is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
228-
)
216+
if hs.get_instance_name() in hs.config.worker.writers.events:
217+
self._stream_id_gen = StreamIdGenerator(
218+
db_conn,
219+
"events",
220+
"stream_ordering",
221+
)
222+
self._backfill_id_gen = StreamIdGenerator(
223+
db_conn,
224+
"events",
225+
"stream_ordering",
226+
step=-1,
227+
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
228+
)
229+
else:
230+
self._stream_id_gen = SlavedIdTracker(
231+
db_conn, "events", "stream_ordering"
232+
)
233+
self._backfill_id_gen = SlavedIdTracker(
234+
db_conn, "events", "stream_ordering", step=-1
235+
)
229236

230237
events_max = self._stream_id_gen.get_current_token()
231238
curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(

synapse/storage/databases/main/push_rule.py

+9-8
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
from synapse.api.errors import StoreError
3232
from synapse.config.homeserver import ExperimentalConfig
33+
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
3334
from synapse.replication.tcp.streams import PushRulesStream
3435
from synapse.storage._base import SQLBaseStore
3536
from synapse.storage.database import (
@@ -110,14 +111,14 @@ def __init__(
110111
):
111112
super().__init__(database, db_conn, hs)
112113

113-
# In the worker store this is an ID tracker which we overwrite in the non-worker
114-
# class below that is used on the main process.
115-
self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
116-
db_conn,
117-
"push_rules_stream",
118-
"stream_id",
119-
is_writer=hs.config.worker.worker_app is None,
120-
)
114+
if hs.config.worker.worker_app is None:
115+
self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
116+
db_conn, "push_rules_stream", "stream_id"
117+
)
118+
else:
119+
self._push_rules_stream_id_gen = SlavedIdTracker(
120+
db_conn, "push_rules_stream", "stream_id"
121+
)
121122

122123
push_rules_prefill, push_rules_id = self.db_pool.get_cache_dict(
123124
db_conn,

synapse/storage/databases/main/pusher.py

+15-9
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
)
2828

2929
from synapse.push import PusherConfig, ThrottleParams
30+
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
3031
from synapse.replication.tcp.streams import PushersStream
3132
from synapse.storage._base import SQLBaseStore, db_to_json
3233
from synapse.storage.database import (
@@ -58,15 +59,20 @@ def __init__(
5859
):
5960
super().__init__(database, db_conn, hs)
6061

61-
# In the worker store this is an ID tracker which we overwrite in the non-worker
62-
# class below that is used on the main process.
63-
self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
64-
db_conn,
65-
"pushers",
66-
"id",
67-
extra_tables=[("deleted_pushers", "stream_id")],
68-
is_writer=hs.config.worker.worker_app is None,
69-
)
62+
if hs.config.worker.worker_app is None:
63+
self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
64+
db_conn,
65+
"pushers",
66+
"id",
67+
extra_tables=[("deleted_pushers", "stream_id")],
68+
)
69+
else:
70+
self._pushers_id_gen = SlavedIdTracker(
71+
db_conn,
72+
"pushers",
73+
"id",
74+
extra_tables=[("deleted_pushers", "stream_id")],
75+
)
7076

7177
self.db_pool.updates.register_background_update_handler(
7278
"remove_deactivated_pushers",

synapse/storage/databases/main/receipts.py

+9-9
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
)
2828

2929
from synapse.api.constants import EduTypes
30+
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
3031
from synapse.replication.tcp.streams import ReceiptsStream
3132
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
3233
from synapse.storage.database import (
@@ -60,9 +61,6 @@ def __init__(
6061
hs: "HomeServer",
6162
):
6263
self._instance_name = hs.get_instance_name()
63-
64-
# In the worker store this is an ID tracker which we overwrite in the non-worker
65-
# class below that is used on the main process.
6664
self._receipts_id_gen: AbstractStreamIdTracker
6765

6866
if isinstance(database.engine, PostgresEngine):
@@ -89,12 +87,14 @@ def __init__(
8987
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
9088
# updated over replication. (Multiple writers are not supported for
9189
# SQLite).
92-
self._receipts_id_gen = StreamIdGenerator(
93-
db_conn,
94-
"receipts_linearized",
95-
"stream_id",
96-
is_writer=hs.get_instance_name() in hs.config.worker.writers.receipts,
97-
)
90+
if hs.get_instance_name() in hs.config.worker.writers.receipts:
91+
self._receipts_id_gen = StreamIdGenerator(
92+
db_conn, "receipts_linearized", "stream_id"
93+
)
94+
else:
95+
self._receipts_id_gen = SlavedIdTracker(
96+
db_conn, "receipts_linearized", "stream_id"
97+
)
9898

9999
super().__init__(database, db_conn, hs)
100100

synapse/storage/util/id_generators.py

+3-10
Original file line numberDiff line numberDiff line change
@@ -186,13 +186,11 @@ def __init__(
186186
column: str,
187187
extra_tables: Iterable[Tuple[str, str]] = (),
188188
step: int = 1,
189-
is_writer: bool = True,
190189
) -> None:
191190
assert step != 0
192191
self._lock = threading.Lock()
193192
self._step: int = step
194193
self._current: int = _load_current_id(db_conn, table, column, step)
195-
self._is_writer = is_writer
196194
for table, column in extra_tables:
197195
self._current = (max if step > 0 else min)(
198196
self._current, _load_current_id(db_conn, table, column, step)
@@ -206,11 +204,9 @@ def __init__(
206204
self._unfinished_ids: OrderedDict[int, int] = OrderedDict()
207205

208206
def advance(self, instance_name: str, new_id: int) -> None:
209-
# Advance should never be called on a writer instance, only over replication
210-
if self._is_writer:
211-
raise Exception("Replication is not supported by writer StreamIdGenerator")
212-
213-
self._current = (max if self._step > 0 else min)(self._current, new_id)
207+
# `StreamIdGenerator` should only be used when there is a single writer,
208+
# so replication should never happen.
209+
raise Exception("Replication is not supported by StreamIdGenerator")
214210

215211
def get_next(self) -> AsyncContextManager[int]:
216212
with self._lock:
@@ -253,9 +249,6 @@ def manager() -> Generator[Sequence[int], None, None]:
253249
return _AsyncCtxManagerWrapper(manager())
254250

255251
def get_current_token(self) -> int:
256-
if self._is_writer:
257-
return self._current
258-
259252
with self._lock:
260253
if self._unfinished_ids:
261254
return next(iter(self._unfinished_ids)) - self._step

0 commit comments

Comments
 (0)