Skip to content

Commit b3b7937

Browse files
authored
Fix sync waiting for an invalid token from the "future" (#17386)
Fixes #17274, hopefully. Basically, old versions of Synapse could advance streams without persisting anything in the DB (fixed in #17229). On restart those updates would get lost, and so the position of the stream would revert to an older position. If this happened across an upgrade to a later Synapse version which included #17215, then sync could get blocked indefinitely (until the stream advanced to the position in the token). We fix this by bounding the stream positions we'll wait for to the maximum position of the underlying stream ID generator.
1 parent 9c8f1a6 commit b3b7937

File tree

17 files changed

+229
-31
lines changed

17 files changed

+229
-31
lines changed

Diff for: changelog.d/17386.bugfix

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix bug where `/sync` requests could get blocked indefinitely after an upgrade from Synapse versions before v1.109.0.

Diff for: synapse/notifier.py

+7
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,13 @@ async def check_for_updates(
764764

765765
async def wait_for_stream_token(self, stream_token: StreamToken) -> bool:
766766
"""Wait for this worker to catch up with the given stream token."""
767+
current_token = self.event_sources.get_current_token()
768+
if stream_token.is_before_or_eq(current_token):
769+
return True
770+
771+
# Work around a bug where older Synapse versions gave out tokens "from
772+
# the future", i.e. that are ahead of the tokens persisted in the DB.
773+
stream_token = await self.event_sources.bound_future_token(stream_token)
767774

768775
start = self.clock.time_msec()
769776
while True:

Diff for: synapse/storage/databases/main/account_data.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,7 @@
4343
)
4444
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
4545
from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
46-
from synapse.storage.util.id_generators import (
47-
AbstractStreamIdGenerator,
48-
MultiWriterIdGenerator,
49-
)
46+
from synapse.storage.util.id_generators import MultiWriterIdGenerator
5047
from synapse.types import JsonDict, JsonMapping
5148
from synapse.util import json_encoder
5249
from synapse.util.caches.descriptors import cached
@@ -71,7 +68,7 @@ def __init__(
7168
self._instance_name in hs.config.worker.writers.account_data
7269
)
7370

74-
self._account_data_id_gen: AbstractStreamIdGenerator
71+
self._account_data_id_gen: MultiWriterIdGenerator
7572

7673
self._account_data_id_gen = MultiWriterIdGenerator(
7774
db_conn=db_conn,
@@ -113,6 +110,9 @@ def get_max_account_data_stream_id(self) -> int:
113110
"""
114111
return self._account_data_id_gen.get_current_token()
115112

113+
def get_account_data_id_generator(self) -> MultiWriterIdGenerator:
114+
return self._account_data_id_gen
115+
116116
@cached()
117117
async def get_global_account_data_for_user(
118118
self, user_id: str

Diff for: synapse/storage/databases/main/deviceinbox.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,7 @@
5050
LoggingTransaction,
5151
make_in_list_sql_clause,
5252
)
53-
from synapse.storage.util.id_generators import (
54-
AbstractStreamIdGenerator,
55-
MultiWriterIdGenerator,
56-
)
53+
from synapse.storage.util.id_generators import MultiWriterIdGenerator
5754
from synapse.types import JsonDict
5855
from synapse.util import json_encoder
5956
from synapse.util.caches.expiringcache import ExpiringCache
@@ -92,7 +89,7 @@ def __init__(
9289
self._instance_name in hs.config.worker.writers.to_device
9390
)
9491

95-
self._to_device_msg_id_gen: AbstractStreamIdGenerator = MultiWriterIdGenerator(
92+
self._to_device_msg_id_gen: MultiWriterIdGenerator = MultiWriterIdGenerator(
9693
db_conn=db_conn,
9794
db=database,
9895
notifier=hs.get_replication_notifier(),
@@ -169,6 +166,9 @@ def process_replication_position(
169166
def get_to_device_stream_token(self) -> int:
170167
return self._to_device_msg_id_gen.get_current_token()
171168

169+
def get_to_device_id_generator(self) -> MultiWriterIdGenerator:
170+
return self._to_device_msg_id_gen
171+
172172
async def get_messages_for_user_devices(
173173
self,
174174
user_ids: Collection[str],

Diff for: synapse/storage/databases/main/devices.py

+3
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,9 @@ def device_lists_in_rooms_have_changed(
243243
def get_device_stream_token(self) -> int:
244244
return self._device_list_id_gen.get_current_token()
245245

246+
def get_device_stream_id_generator(self) -> MultiWriterIdGenerator:
247+
return self._device_list_id_gen
248+
246249
async def count_devices_by_users(
247250
self, user_ids: Optional[Collection[str]] = None
248251
) -> int:

Diff for: synapse/storage/databases/main/events_worker.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,8 @@ def __init__(
192192
):
193193
super().__init__(database, db_conn, hs)
194194

195-
self._stream_id_gen: AbstractStreamIdGenerator
196-
self._backfill_id_gen: AbstractStreamIdGenerator
195+
self._stream_id_gen: MultiWriterIdGenerator
196+
self._backfill_id_gen: MultiWriterIdGenerator
197197

198198
self._stream_id_gen = MultiWriterIdGenerator(
199199
db_conn=db_conn,

Diff for: synapse/storage/databases/main/presence.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,7 @@
4242
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
4343
from synapse.storage.engines._base import IsolationLevel
4444
from synapse.storage.types import Connection
45-
from synapse.storage.util.id_generators import (
46-
AbstractStreamIdGenerator,
47-
MultiWriterIdGenerator,
48-
)
45+
from synapse.storage.util.id_generators import MultiWriterIdGenerator
4946
from synapse.util.caches.descriptors import cached, cachedList
5047
from synapse.util.caches.stream_change_cache import StreamChangeCache
5148
from synapse.util.iterutils import batch_iter
@@ -83,7 +80,7 @@ def __init__(
8380
super().__init__(database, db_conn, hs)
8481

8582
self._instance_name = hs.get_instance_name()
86-
self._presence_id_gen: AbstractStreamIdGenerator
83+
self._presence_id_gen: MultiWriterIdGenerator
8784

8885
self._can_persist_presence = (
8986
self._instance_name in hs.config.worker.writers.presence
@@ -455,6 +452,9 @@ async def get_presence_for_all_users(
455452
def get_current_presence_token(self) -> int:
456453
return self._presence_id_gen.get_current_token()
457454

455+
def get_presence_stream_id_gen(self) -> MultiWriterIdGenerator:
456+
return self._presence_id_gen
457+
458458
def _get_active_presence(self, db_conn: Connection) -> List[UserPresenceState]:
459459
"""Fetch non-offline presence from the database so that we can register
460460
the appropriate time outs.

Diff for: synapse/storage/databases/main/push_rule.py

+3
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,9 @@ def get_max_push_rules_stream_id(self) -> int:
178178
"""
179179
return self._push_rules_stream_id_gen.get_current_token()
180180

181+
def get_push_rules_stream_id_gen(self) -> MultiWriterIdGenerator:
182+
return self._push_rules_stream_id_gen
183+
181184
def process_replication_rows(
182185
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
183186
) -> None:

Diff for: synapse/storage/databases/main/receipts.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,7 @@
4545
LoggingTransaction,
4646
)
4747
from synapse.storage.engines._base import IsolationLevel
48-
from synapse.storage.util.id_generators import (
49-
AbstractStreamIdGenerator,
50-
MultiWriterIdGenerator,
51-
)
48+
from synapse.storage.util.id_generators import MultiWriterIdGenerator
5249
from synapse.types import (
5350
JsonDict,
5451
JsonMapping,
@@ -76,7 +73,7 @@ def __init__(
7673

7774
# In the worker store this is an ID tracker which we overwrite in the non-worker
7875
# class below that is used on the main process.
79-
self._receipts_id_gen: AbstractStreamIdGenerator
76+
self._receipts_id_gen: MultiWriterIdGenerator
8077

8178
self._can_write_to_receipts = (
8279
self._instance_name in hs.config.worker.writers.receipts
@@ -136,6 +133,9 @@ def get_max_receipt_stream_id(self) -> MultiWriterStreamToken:
136133
def get_receipt_stream_id_for_instance(self, instance_name: str) -> int:
137134
return self._receipts_id_gen.get_current_token_for_writer(instance_name)
138135

136+
def get_receipts_stream_id_gen(self) -> MultiWriterIdGenerator:
137+
return self._receipts_id_gen
138+
139139
def get_last_unthreaded_receipt_for_user_txn(
140140
self,
141141
txn: LoggingTransaction,

Diff for: synapse/storage/databases/main/room.py

+5-6
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,7 @@
5959
)
6060
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
6161
from synapse.storage.types import Cursor
62-
from synapse.storage.util.id_generators import (
63-
AbstractStreamIdGenerator,
64-
IdGenerator,
65-
MultiWriterIdGenerator,
66-
)
62+
from synapse.storage.util.id_generators import IdGenerator, MultiWriterIdGenerator
6763
from synapse.types import JsonDict, RetentionPolicy, StrCollection, ThirdPartyInstanceID
6864
from synapse.util import json_encoder
6965
from synapse.util.caches.descriptors import cached, cachedList
@@ -151,7 +147,7 @@ def __init__(
151147

152148
self.config: HomeServerConfig = hs.config
153149

154-
self._un_partial_stated_rooms_stream_id_gen: AbstractStreamIdGenerator
150+
self._un_partial_stated_rooms_stream_id_gen: MultiWriterIdGenerator
155151

156152
self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
157153
db_conn=db_conn,
@@ -1409,6 +1405,9 @@ def get_un_partial_stated_rooms_token(self, instance_name: str) -> int:
14091405
instance_name
14101406
)
14111407

1408+
def get_un_partial_stated_rooms_id_generator(self) -> MultiWriterIdGenerator:
1409+
return self._un_partial_stated_rooms_stream_id_gen
1410+
14121411
async def get_un_partial_stated_rooms_between(
14131412
self, last_id: int, current_id: int, room_ids: Collection[str]
14141413
) -> Set[str]:

Diff for: synapse/storage/databases/main/stream.py

+3
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,9 @@ def get_room_max_token(self) -> RoomStreamToken:
577577

578578
return RoomStreamToken(stream=min_pos, instance_map=immutabledict(positions))
579579

580+
def get_events_stream_id_generator(self) -> MultiWriterIdGenerator:
581+
return self._stream_id_gen
582+
580583
async def get_room_events_stream_for_rooms(
581584
self,
582585
room_ids: Collection[str],

Diff for: synapse/storage/util/id_generators.py

+5
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,11 @@ def _update_stream_positions_table_txn(self, txn: Cursor) -> None:
812812
pos = self.get_current_token_for_writer(self._instance_name)
813813
txn.execute(sql, (self._stream_name, self._instance_name, pos))
814814

815+
async def get_max_allocated_token(self) -> int:
816+
return await self._db.runInteraction(
817+
"get_max_allocated_token", self._sequence_gen.get_max_allocated
818+
)
819+
815820

816821
@attr.s(frozen=True, auto_attribs=True)
817822
class _AsyncCtxManagerWrapper(Generic[T]):

Diff for: synapse/storage/util/sequence.py

+24
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ def check_consistency(
8888
"""
8989
...
9090

91+
@abc.abstractmethod
92+
def get_max_allocated(self, txn: Cursor) -> int:
93+
"""Get the maximum ID that we have allocated"""
94+
9195

9296
class PostgresSequenceGenerator(SequenceGenerator):
9397
"""An implementation of SequenceGenerator which uses a postgres sequence"""
@@ -190,6 +194,17 @@ def check_consistency(
190194
% {"seq": self._sequence_name, "stream_name": stream_name}
191195
)
192196

197+
def get_max_allocated(self, txn: Cursor) -> int:
198+
# We just read from the sequence what the last value we fetched was.
199+
txn.execute(f"SELECT last_value, is_called FROM {self._sequence_name}")
200+
row = txn.fetchone()
201+
assert row is not None
202+
203+
last_value, is_called = row
204+
if not is_called:
205+
last_value -= 1
206+
return last_value
207+
193208

194209
GetFirstCallbackType = Callable[[Cursor], int]
195210

@@ -248,6 +263,15 @@ def check_consistency(
248263
# There is nothing to do for in memory sequences
249264
pass
250265

266+
def get_max_allocated(self, txn: Cursor) -> int:
267+
with self._lock:
268+
if self._current_max_id is None:
269+
assert self._callback is not None
270+
self._current_max_id = self._callback(txn)
271+
self._callback = None
272+
273+
return self._current_max_id
274+
251275

252276
def build_sequence_generator(
253277
db_conn: "LoggingDatabaseConnection",

Diff for: synapse/streams/events.py

+63-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,12 @@
3030
from synapse.handlers.typing import TypingNotificationEventSource
3131
from synapse.logging.opentracing import trace
3232
from synapse.streams import EventSource
33-
from synapse.types import MultiWriterStreamToken, StreamKeyType, StreamToken
33+
from synapse.types import (
34+
AbstractMultiWriterStreamToken,
35+
MultiWriterStreamToken,
36+
StreamKeyType,
37+
StreamToken,
38+
)
3439

3540
if TYPE_CHECKING:
3641
from synapse.server import HomeServer
@@ -91,6 +96,63 @@ def get_current_token(self) -> StreamToken:
9196
)
9297
return token
9398

99+
async def bound_future_token(self, token: StreamToken) -> StreamToken:
100+
"""Bound a token that is ahead of the current token to the maximum
101+
persisted values.
102+
103+
This ensures that if we wait for the given token we know the stream will
104+
eventually advance to that point.
105+
106+
This works around a bug where older Synapse versions will give out
107+
tokens for streams, and then after a restart will give back tokens where
108+
the stream has "gone backwards".
109+
"""
110+
111+
current_token = self.get_current_token()
112+
113+
stream_key_to_id_gen = {
114+
StreamKeyType.ROOM: self.store.get_events_stream_id_generator(),
115+
StreamKeyType.PRESENCE: self.store.get_presence_stream_id_gen(),
116+
StreamKeyType.RECEIPT: self.store.get_receipts_stream_id_gen(),
117+
StreamKeyType.ACCOUNT_DATA: self.store.get_account_data_id_generator(),
118+
StreamKeyType.PUSH_RULES: self.store.get_push_rules_stream_id_gen(),
119+
StreamKeyType.TO_DEVICE: self.store.get_to_device_id_generator(),
120+
StreamKeyType.DEVICE_LIST: self.store.get_device_stream_id_generator(),
121+
StreamKeyType.UN_PARTIAL_STATED_ROOMS: self.store.get_un_partial_stated_rooms_id_generator(),
122+
}
123+
124+
for _, key in StreamKeyType.__members__.items():
125+
if key == StreamKeyType.TYPING:
126+
# Typing stream is allowed to "reset", and so comparisons don't
127+
# really make sense as is.
128+
# TODO: Figure out a better way of tracking resets.
129+
continue
130+
131+
token_value = token.get_field(key)
132+
current_value = current_token.get_field(key)
133+
134+
if isinstance(token_value, AbstractMultiWriterStreamToken):
135+
assert type(current_value) is type(token_value)
136+
137+
if not token_value.is_before_or_eq(current_value): # type: ignore[arg-type]
138+
max_token = await stream_key_to_id_gen[
139+
key
140+
].get_max_allocated_token()
141+
142+
token = token.copy_and_replace(
143+
key, token.room_key.bound_stream_token(max_token)
144+
)
145+
else:
146+
assert isinstance(current_value, int)
147+
if current_value < token_value:
148+
max_token = await stream_key_to_id_gen[
149+
key
150+
].get_max_allocated_token()
151+
152+
token = token.copy_and_replace(key, min(token_value, max_token))
153+
154+
return token
155+
94156
@trace
95157
async def get_start_token_for_pagination(self, room_id: str) -> StreamToken:
96158
"""Get the start token for a given room to be used to paginate

Diff for: synapse/types/__init__.py

+18
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,16 @@ def is_before_or_eq(self, other_token: Self) -> bool:
536536

537537
return True
538538

539+
def bound_stream_token(self, max_stream: int) -> "Self":
540+
"""Bound the stream positions to a maximum value"""
541+
542+
return type(self)(
543+
stream=min(self.stream, max_stream),
544+
instance_map=immutabledict(
545+
{k: min(s, max_stream) for k, s in self.instance_map.items()}
546+
),
547+
)
548+
539549

540550
@attr.s(frozen=True, slots=True, order=False)
541551
class RoomStreamToken(AbstractMultiWriterStreamToken):
@@ -722,6 +732,14 @@ async def to_string(self, store: "DataStore") -> str:
722732
else:
723733
return "s%d" % (self.stream,)
724734

735+
def bound_stream_token(self, max_stream: int) -> "RoomStreamToken":
736+
"""See super class"""
737+
738+
# This only makes sense for stream tokens.
739+
assert self.topological is None
740+
741+
return super().bound_stream_token(max_stream)
742+
725743

726744
@attr.s(frozen=True, slots=True, order=False)
727745
class MultiWriterStreamToken(AbstractMultiWriterStreamToken):

0 commit comments

Comments
 (0)