Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 115f0eb

Browse files
David RobertsonFizzadarerikjohnston
authored
Reintroduce #14376, with bugfix for monoliths (#14468)
* Add tests for StreamIdGenerator * Drive-by: annotate all defs * Revert "Revert "Remove slaved id tracker (#14376)" (#14463)" This reverts commit d63814f, which in turn reverted 36097e8. This restores the latter. * Fix StreamIdGenerator not handling unpersisted IDs Spotted by @erikjohnston. Closes #14456. * Changelog Co-authored-by: Nick Mills-Barrett <[email protected]> Co-authored-by: Erik Johnston <[email protected]>
1 parent c15e9a0 commit 115f0eb

File tree

14 files changed

+230
-186
lines changed

14 files changed

+230
-186
lines changed

changelog.d/14376.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Remove old stream ID tracking code. Contributed by Nick @Beeper (@fizzadar).

changelog.d/14468.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Remove old stream ID tracking code. Contributed by Nick @Beeper (@fizzadar).

mypy.ini

+3
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ disallow_untyped_defs = True
117117
[mypy-tests.state.test_profile]
118118
disallow_untyped_defs = True
119119

120+
[mypy-tests.storage.test_id_generators]
121+
disallow_untyped_defs = True
122+
120123
[mypy-tests.storage.test_profile]
121124
disallow_untyped_defs = True
122125

synapse/replication/slave/__init__.py

-13
This file was deleted.

synapse/replication/slave/storage/__init__.py

-13
This file was deleted.

synapse/replication/slave/storage/_slaved_id_tracker.py

-50
This file was deleted.

synapse/storage/databases/main/account_data.py

+10-20
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
)
2828

2929
from synapse.api.constants import AccountDataTypes
30-
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
3130
from synapse.replication.tcp.streams import AccountDataStream, TagAccountDataStream
3231
from synapse.storage._base import db_to_json
3332
from synapse.storage.database import (
@@ -68,12 +67,11 @@ def __init__(
6867
# to write account data. A value of `True` implies that `_account_data_id_gen`
6968
# is an `AbstractStreamIdGenerator` and not just a tracker.
7069
self._account_data_id_gen: AbstractStreamIdTracker
70+
self._can_write_to_account_data = (
71+
self._instance_name in hs.config.worker.writers.account_data
72+
)
7173

7274
if isinstance(database.engine, PostgresEngine):
73-
self._can_write_to_account_data = (
74-
self._instance_name in hs.config.worker.writers.account_data
75-
)
76-
7775
self._account_data_id_gen = MultiWriterIdGenerator(
7876
db_conn=db_conn,
7977
db=database,
@@ -95,21 +93,13 @@ def __init__(
9593
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
9694
# updated over replication. (Multiple writers are not supported for
9795
# SQLite).
98-
if self._instance_name in hs.config.worker.writers.account_data:
99-
self._can_write_to_account_data = True
100-
self._account_data_id_gen = StreamIdGenerator(
101-
db_conn,
102-
"room_account_data",
103-
"stream_id",
104-
extra_tables=[("room_tags_revisions", "stream_id")],
105-
)
106-
else:
107-
self._account_data_id_gen = SlavedIdTracker(
108-
db_conn,
109-
"room_account_data",
110-
"stream_id",
111-
extra_tables=[("room_tags_revisions", "stream_id")],
112-
)
96+
self._account_data_id_gen = StreamIdGenerator(
97+
db_conn,
98+
"room_account_data",
99+
"stream_id",
100+
extra_tables=[("room_tags_revisions", "stream_id")],
101+
is_writer=self._instance_name in hs.config.worker.writers.account_data,
102+
)
113103

114104
account_max = self.get_max_account_data_stream_id()
115105
self._account_data_stream_cache = StreamChangeCache(

synapse/storage/databases/main/devices.py

+13-23
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
whitelisted_homeserver,
3939
)
4040
from synapse.metrics.background_process_metrics import wrap_as_background_process
41-
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
4241
from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
4342
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
4443
from synapse.storage.database import (
@@ -86,28 +85,19 @@ def __init__(
8685
):
8786
super().__init__(database, db_conn, hs)
8887

89-
if hs.config.worker.worker_app is None:
90-
self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
91-
db_conn,
92-
"device_lists_stream",
93-
"stream_id",
94-
extra_tables=[
95-
("user_signature_stream", "stream_id"),
96-
("device_lists_outbound_pokes", "stream_id"),
97-
("device_lists_changes_in_room", "stream_id"),
98-
],
99-
)
100-
else:
101-
self._device_list_id_gen = SlavedIdTracker(
102-
db_conn,
103-
"device_lists_stream",
104-
"stream_id",
105-
extra_tables=[
106-
("user_signature_stream", "stream_id"),
107-
("device_lists_outbound_pokes", "stream_id"),
108-
("device_lists_changes_in_room", "stream_id"),
109-
],
110-
)
88+
# In the worker store this is an ID tracker which we overwrite in the non-worker
89+
# class below that is used on the main process.
90+
self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
91+
db_conn,
92+
"device_lists_stream",
93+
"stream_id",
94+
extra_tables=[
95+
("user_signature_stream", "stream_id"),
96+
("device_lists_outbound_pokes", "stream_id"),
97+
("device_lists_changes_in_room", "stream_id"),
98+
],
99+
is_writer=hs.config.worker.worker_app is None,
100+
)
111101

112102
# Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a
113103
# StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker).

synapse/storage/databases/main/events_worker.py

+14-21
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
run_as_background_process,
6060
wrap_as_background_process,
6161
)
62-
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
6362
from synapse.replication.tcp.streams import BackfillStream
6463
from synapse.replication.tcp.streams.events import EventsStream
6564
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -213,26 +212,20 @@ def __init__(
213212
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
214213
# updated over replication. (Multiple writers are not supported for
215214
# SQLite).
216-
if hs.get_instance_name() in hs.config.worker.writers.events:
217-
self._stream_id_gen = StreamIdGenerator(
218-
db_conn,
219-
"events",
220-
"stream_ordering",
221-
)
222-
self._backfill_id_gen = StreamIdGenerator(
223-
db_conn,
224-
"events",
225-
"stream_ordering",
226-
step=-1,
227-
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
228-
)
229-
else:
230-
self._stream_id_gen = SlavedIdTracker(
231-
db_conn, "events", "stream_ordering"
232-
)
233-
self._backfill_id_gen = SlavedIdTracker(
234-
db_conn, "events", "stream_ordering", step=-1
235-
)
215+
self._stream_id_gen = StreamIdGenerator(
216+
db_conn,
217+
"events",
218+
"stream_ordering",
219+
is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
220+
)
221+
self._backfill_id_gen = StreamIdGenerator(
222+
db_conn,
223+
"events",
224+
"stream_ordering",
225+
step=-1,
226+
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
227+
is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
228+
)
236229

237230
events_max = self._stream_id_gen.get_current_token()
238231
curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(

synapse/storage/databases/main/push_rule.py

+8-9
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030

3131
from synapse.api.errors import StoreError
3232
from synapse.config.homeserver import ExperimentalConfig
33-
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
3433
from synapse.replication.tcp.streams import PushRulesStream
3534
from synapse.storage._base import SQLBaseStore
3635
from synapse.storage.database import (
@@ -111,14 +110,14 @@ def __init__(
111110
):
112111
super().__init__(database, db_conn, hs)
113112

114-
if hs.config.worker.worker_app is None:
115-
self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
116-
db_conn, "push_rules_stream", "stream_id"
117-
)
118-
else:
119-
self._push_rules_stream_id_gen = SlavedIdTracker(
120-
db_conn, "push_rules_stream", "stream_id"
121-
)
113+
# In the worker store this is an ID tracker which we overwrite in the non-worker
114+
# class below that is used on the main process.
115+
self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
116+
db_conn,
117+
"push_rules_stream",
118+
"stream_id",
119+
is_writer=hs.config.worker.worker_app is None,
120+
)
122121

123122
push_rules_prefill, push_rules_id = self.db_pool.get_cache_dict(
124123
db_conn,

synapse/storage/databases/main/pusher.py

+9-15
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
)
2828

2929
from synapse.push import PusherConfig, ThrottleParams
30-
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
3130
from synapse.replication.tcp.streams import PushersStream
3231
from synapse.storage._base import SQLBaseStore, db_to_json
3332
from synapse.storage.database import (
@@ -59,20 +58,15 @@ def __init__(
5958
):
6059
super().__init__(database, db_conn, hs)
6160

62-
if hs.config.worker.worker_app is None:
63-
self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
64-
db_conn,
65-
"pushers",
66-
"id",
67-
extra_tables=[("deleted_pushers", "stream_id")],
68-
)
69-
else:
70-
self._pushers_id_gen = SlavedIdTracker(
71-
db_conn,
72-
"pushers",
73-
"id",
74-
extra_tables=[("deleted_pushers", "stream_id")],
75-
)
61+
# In the worker store this is an ID tracker which we overwrite in the non-worker
62+
# class below that is used on the main process.
63+
self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
64+
db_conn,
65+
"pushers",
66+
"id",
67+
extra_tables=[("deleted_pushers", "stream_id")],
68+
is_writer=hs.config.worker.worker_app is None,
69+
)
7670

7771
self.db_pool.updates.register_background_update_handler(
7872
"remove_deactivated_pushers",

synapse/storage/databases/main/receipts.py

+9-9
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
)
2828

2929
from synapse.api.constants import EduTypes
30-
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
3130
from synapse.replication.tcp.streams import ReceiptsStream
3231
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
3332
from synapse.storage.database import (
@@ -61,6 +60,9 @@ def __init__(
6160
hs: "HomeServer",
6261
):
6362
self._instance_name = hs.get_instance_name()
63+
64+
# In the worker store this is an ID tracker which we overwrite in the non-worker
65+
# class below that is used on the main process.
6466
self._receipts_id_gen: AbstractStreamIdTracker
6567

6668
if isinstance(database.engine, PostgresEngine):
@@ -87,14 +89,12 @@ def __init__(
8789
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
8890
# updated over replication. (Multiple writers are not supported for
8991
# SQLite).
90-
if hs.get_instance_name() in hs.config.worker.writers.receipts:
91-
self._receipts_id_gen = StreamIdGenerator(
92-
db_conn, "receipts_linearized", "stream_id"
93-
)
94-
else:
95-
self._receipts_id_gen = SlavedIdTracker(
96-
db_conn, "receipts_linearized", "stream_id"
97-
)
92+
self._receipts_id_gen = StreamIdGenerator(
93+
db_conn,
94+
"receipts_linearized",
95+
"stream_id",
96+
is_writer=hs.get_instance_name() in hs.config.worker.writers.receipts,
97+
)
9898

9999
super().__init__(database, db_conn, hs)
100100

synapse/storage/util/id_generators.py

+10-3
Original file line numberDiff line numberDiff line change
@@ -186,11 +186,13 @@ def __init__(
186186
column: str,
187187
extra_tables: Iterable[Tuple[str, str]] = (),
188188
step: int = 1,
189+
is_writer: bool = True,
189190
) -> None:
190191
assert step != 0
191192
self._lock = threading.Lock()
192193
self._step: int = step
193194
self._current: int = _load_current_id(db_conn, table, column, step)
195+
self._is_writer = is_writer
194196
for table, column in extra_tables:
195197
self._current = (max if step > 0 else min)(
196198
self._current, _load_current_id(db_conn, table, column, step)
@@ -204,9 +206,11 @@ def __init__(
204206
self._unfinished_ids: OrderedDict[int, int] = OrderedDict()
205207

206208
def advance(self, instance_name: str, new_id: int) -> None:
207-
# `StreamIdGenerator` should only be used when there is a single writer,
208-
# so replication should never happen.
209-
raise Exception("Replication is not supported by StreamIdGenerator")
209+
# Advance should never be called on a writer instance, only over replication
210+
if self._is_writer:
211+
raise Exception("Replication is not supported by writer StreamIdGenerator")
212+
213+
self._current = (max if self._step > 0 else min)(self._current, new_id)
210214

211215
def get_next(self) -> AsyncContextManager[int]:
212216
with self._lock:
@@ -249,6 +253,9 @@ def manager() -> Generator[Sequence[int], None, None]:
249253
return _AsyncCtxManagerWrapper(manager())
250254

251255
def get_current_token(self) -> int:
256+
if not self._is_writer:
257+
return self._current
258+
252259
with self._lock:
253260
if self._unfinished_ids:
254261
return next(iter(self._unfinished_ids)) - self._step

0 commit comments

Comments
 (0)