-
Notifications
You must be signed in to change notification settings - Fork 333
Fix bug where Sliding Sync could get stuck when using workers #17438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
bf09110
d2c8d48
6d86303
31e6508
940b644
4bf6b06
0ccc295
2a49675
224a739
546c2a8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix rare bug where `/sync` would break for a user when using workers with multiple stream writers. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
# | ||
# | ||
import abc | ||
import logging | ||
import re | ||
import string | ||
from enum import Enum | ||
|
@@ -74,6 +75,9 @@ | |
from synapse.storage.databases.main import DataStore, PurgeEventsStore | ||
from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
# Define a state map type from type/state_key to T (usually an event ID or | ||
# event) | ||
T = TypeVar("T") | ||
|
@@ -454,6 +458,8 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta): | |
represented by a default `stream` attribute and a map of instance name to | ||
stream position of any writers that are ahead of the default stream | ||
position. | ||
|
||
The values in `instance_map` must be greater than the `stream` attribute. | ||
""" | ||
|
||
stream: int = attr.ib(validator=attr.validators.instance_of(int), kw_only=True) | ||
|
@@ -468,6 +474,15 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta): | |
kw_only=True, | ||
) | ||
|
||
def __attrs_post_init__(self) -> None: | ||
# Enforce that all instances have a value greater than the min stream | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. greater than or equal to* likewise in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, its probably better to do a strict check. |
||
# position. | ||
for i, v in self.instance_map.items(): | ||
if v <= self.stream: | ||
raise ValueError( | ||
f"'instance_map' includes a stream position before the main 'stream' attribute. Instance: {i}" | ||
) | ||
|
||
@classmethod | ||
@abc.abstractmethod | ||
async def parse(cls, store: "DataStore", string: str) -> "Self": | ||
|
@@ -494,6 +509,9 @@ def copy_and_advance(self, other: "Self") -> "Self": | |
for instance in set(self.instance_map).union(other.instance_map) | ||
} | ||
|
||
# Filter out any redundant entries. | ||
instance_map = {i: s for i, s in instance_map.items() if s > max_stream} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is new, I forgot to unstash the change. It's now important since we do strict checks. |
||
|
||
return attr.evolve( | ||
self, stream=max_stream, instance_map=immutabledict(instance_map) | ||
) | ||
|
@@ -539,10 +557,15 @@ def is_before_or_eq(self, other_token: Self) -> bool: | |
def bound_stream_token(self, max_stream: int) -> "Self": | ||
"""Bound the stream positions to a maximum value""" | ||
|
||
min_pos = min(self.stream, max_stream) | ||
return type(self)( | ||
stream=min(self.stream, max_stream), | ||
stream=min_pos, | ||
instance_map=immutabledict( | ||
{k: min(s, max_stream) for k, s in self.instance_map.items()} | ||
{ | ||
k: min(s, max_stream) | ||
for k, s in self.instance_map.items() | ||
if min(s, max_stream) > min_pos | ||
} | ||
), | ||
) | ||
|
||
|
@@ -637,6 +660,8 @@ def __attrs_post_init__(self) -> None: | |
"Cannot set both 'topological' and 'instance_map' on 'RoomStreamToken'." | ||
) | ||
|
||
super().__attrs_post_init__() | ||
|
||
@classmethod | ||
async def parse(cls, store: "PurgeEventsStore", string: str) -> "RoomStreamToken": | ||
try: | ||
|
@@ -651,6 +676,11 @@ async def parse(cls, store: "PurgeEventsStore", string: str) -> "RoomStreamToken | |
|
||
instance_map = {} | ||
for part in parts[1:]: | ||
if not part: | ||
# Handle tokens of the form `m5~`, which were created by | ||
# a bug | ||
continue | ||
|
||
key, value = part.split(".") | ||
instance_id = int(key) | ||
pos = int(value) | ||
|
@@ -666,7 +696,10 @@ async def parse(cls, store: "PurgeEventsStore", string: str) -> "RoomStreamToken | |
except CancelledError: | ||
raise | ||
except Exception: | ||
pass | ||
# We log an exception here as even though this *might* be a client | ||
# handing a bad token, its more likely that Synapse returned a bad | ||
# token (and we really want to catch those!). | ||
logger.exception("Failed to parse stream token: %r", string) | ||
raise SynapseError(400, "Invalid room stream token %r" % (string,)) | ||
|
||
@classmethod | ||
|
@@ -713,6 +746,8 @@ def get_stream_pos_for_instance(self, instance_name: str) -> int: | |
return self.instance_map.get(instance_name, self.stream) | ||
|
||
async def to_string(self, store: "DataStore") -> str: | ||
"""See class level docstring for information about the format.""" | ||
|
||
if self.topological is not None: | ||
return "t%d-%d" % (self.topological, self.stream) | ||
elif self.instance_map: | ||
|
@@ -727,8 +762,10 @@ async def to_string(self, store: "DataStore") -> str: | |
instance_id = await store.get_id_for_instance(name) | ||
entries.append(f"{instance_id}.{pos}") | ||
|
||
encoded_map = "~".join(entries) | ||
return f"m{self.stream}~{encoded_map}" | ||
if entries: | ||
encoded_map = "~".join(entries) | ||
return f"m{self.stream}~{encoded_map}" | ||
return f"s{self.stream}" | ||
anoadragon453 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
else: | ||
return "s%d" % (self.stream,) | ||
|
||
|
@@ -756,6 +793,11 @@ async def parse(cls, store: "DataStore", string: str) -> "MultiWriterStreamToken | |
|
||
instance_map = {} | ||
for part in parts[1:]: | ||
if not part: | ||
# Handle tokens of the form `m5~`, which were created by | ||
# a bug | ||
continue | ||
|
||
key, value = part.split(".") | ||
instance_id = int(key) | ||
pos = int(value) | ||
|
@@ -770,10 +812,15 @@ async def parse(cls, store: "DataStore", string: str) -> "MultiWriterStreamToken | |
except CancelledError: | ||
raise | ||
except Exception: | ||
pass | ||
# We log an exception here as even though this *might* be a client | ||
# handing a bad token, its more likely that Synapse returned a bad | ||
# token (and we really want to catch those!). | ||
logger.exception("Failed to parse stream token: %r", string) | ||
raise SynapseError(400, "Invalid stream token %r" % (string,)) | ||
|
||
async def to_string(self, store: "DataStore") -> str: | ||
"""See class level docstring for information about the format.""" | ||
|
||
if self.instance_map: | ||
entries = [] | ||
for name, pos in self.instance_map.items(): | ||
|
@@ -786,8 +833,10 @@ async def to_string(self, store: "DataStore") -> str: | |
instance_id = await store.get_id_for_instance(name) | ||
entries.append(f"{instance_id}.{pos}") | ||
|
||
encoded_map = "~".join(entries) | ||
return f"m{self.stream}~{encoded_map}" | ||
if entries: | ||
encoded_map = "~".join(entries) | ||
return f"m{self.stream}~{encoded_map}" | ||
return str(self.stream) | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
else: | ||
return str(self.stream) | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused how this previous code could result in an
instance_map
that "contained entries from before the minimum token." We're getting the minimum in that map and using the same map.I could see how it could include entries at the minimum
stream_ordering
(and not being necessary) but not before.Did this spot actually cause bugs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, so if all the instances are equal to the minimum it will cause a problem. The exact bug here is where during serialization we filtered out any instances with an equal value as a minimum, and then if that filtered all instances out we wrote an invalid token (we wrote eg
m54~
instead ofs54
).