Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 1e45305

Browse files
authored
Rename storage classes (#12913)
1 parent e541bb9 commit 1e45305

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+708
-551
lines changed

changelog.d/12913.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Rename storage classes.

synapse/events/snapshot.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from synapse.types import JsonDict, StateMap
2323

2424
if TYPE_CHECKING:
25-
from synapse.storage import Storage
25+
from synapse.storage.controllers import StorageControllers
2626
from synapse.storage.databases.main import DataStore
2727
from synapse.storage.state import StateFilter
2828

@@ -84,7 +84,7 @@ class EventContext:
8484
incomplete state.
8585
"""
8686

87-
_storage: "Storage"
87+
_storage: "StorageControllers"
8888
rejected: Union[Literal[False], str] = False
8989
_state_group: Optional[int] = None
9090
state_group_before_event: Optional[int] = None
@@ -97,7 +97,7 @@ class EventContext:
9797

9898
@staticmethod
9999
def with_state(
100-
storage: "Storage",
100+
storage: "StorageControllers",
101101
state_group: Optional[int],
102102
state_group_before_event: Optional[int],
103103
state_delta_due_to_event: Optional[StateMap[str]],
@@ -117,7 +117,7 @@ def with_state(
117117

118118
@staticmethod
119119
def for_outlier(
120-
storage: "Storage",
120+
storage: "StorageControllers",
121121
) -> "EventContext":
122122
"""Return an EventContext instance suitable for persisting an outlier event"""
123123
return EventContext(storage=storage)
@@ -147,7 +147,7 @@ async def serialize(self, event: EventBase, store: "DataStore") -> JsonDict:
147147
}
148148

149149
@staticmethod
150-
def deserialize(storage: "Storage", input: JsonDict) -> "EventContext":
150+
def deserialize(storage: "StorageControllers", input: JsonDict) -> "EventContext":
151151
"""Converts a dict that was produced by `serialize` back into a
152152
EventContext.
153153

synapse/federation/federation_server.py

-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ def __init__(self, hs: "HomeServer"):
109109
super().__init__(hs)
110110

111111
self.handler = hs.get_federation_handler()
112-
self.storage = hs.get_storage()
113112
self._spam_checker = hs.get_spam_checker()
114113
self._federation_event_handler = hs.get_federation_event_handler()
115114
self.state = hs.get_state_handler()

synapse/handlers/admin.py

+8-4
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
class AdminHandler:
3131
def __init__(self, hs: "HomeServer"):
3232
self.store = hs.get_datastores().main
33-
self.storage = hs.get_storage()
34-
self.state_storage = self.storage.state
33+
self._storage_controllers = hs.get_storage_controllers()
34+
self._state_storage_controller = self._storage_controllers.state
3535

3636
async def get_whois(self, user: UserID) -> JsonDict:
3737
connections = []
@@ -197,7 +197,9 @@ async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") ->
197197

198198
from_key = events[-1].internal_metadata.after
199199

200-
events = await filter_events_for_client(self.storage, user_id, events)
200+
events = await filter_events_for_client(
201+
self._storage_controllers, user_id, events
202+
)
201203

202204
writer.write_events(room_id, events)
203205

@@ -233,7 +235,9 @@ async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") ->
233235
for event_id in extremities:
234236
if not event_to_unseen_prevs[event_id]:
235237
continue
236-
state = await self.state_storage.get_state_for_event(event_id)
238+
state = await self._state_storage_controller.get_state_for_event(
239+
event_id
240+
)
237241
writer.write_state(room_id, event_id, state)
238242

239243
return writer.finished()

synapse/handlers/device.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def __init__(self, hs: "HomeServer"):
7171
self.store = hs.get_datastores().main
7272
self.notifier = hs.get_notifier()
7373
self.state = hs.get_state_handler()
74-
self.state_storage = hs.get_storage().state
74+
self._state_storage = hs.get_storage_controllers().state
7575
self._auth_handler = hs.get_auth_handler()
7676
self.server_name = hs.hostname
7777

@@ -204,7 +204,7 @@ async def get_user_ids_changed(
204204
continue
205205

206206
# mapping from event_id -> state_dict
207-
prev_state_ids = await self.state_storage.get_state_ids_for_events(
207+
prev_state_ids = await self._state_storage.get_state_ids_for_events(
208208
event_ids
209209
)
210210

synapse/handlers/events.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ async def get_stream(
139139
class EventHandler:
140140
def __init__(self, hs: "HomeServer"):
141141
self.store = hs.get_datastores().main
142-
self.storage = hs.get_storage()
142+
self._storage_controllers = hs.get_storage_controllers()
143143

144144
async def get_event(
145145
self,
@@ -177,7 +177,7 @@ async def get_event(
177177
is_peeking = user.to_string() not in users
178178

179179
filtered = await filter_events_for_client(
180-
self.storage, user.to_string(), [event], is_peeking=is_peeking
180+
self._storage_controllers, user.to_string(), [event], is_peeking=is_peeking
181181
)
182182

183183
if not filtered:

synapse/handlers/federation.py

+18-12
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ def __init__(self, hs: "HomeServer"):
125125
self.hs = hs
126126

127127
self.store = hs.get_datastores().main
128-
self.storage = hs.get_storage()
129-
self.state_storage = self.storage.state
128+
self._storage_controllers = hs.get_storage_controllers()
129+
self._state_storage_controller = self._storage_controllers.state
130130
self.federation_client = hs.get_federation_client()
131131
self.state_handler = hs.get_state_handler()
132132
self.server_name = hs.hostname
@@ -324,7 +324,7 @@ async def _maybe_backfill_inner(
324324
# We set `check_history_visibility_only` as we might otherwise get false
325325
# positives from users having been erased.
326326
filtered_extremities = await filter_events_for_server(
327-
self.storage,
327+
self._storage_controllers,
328328
self.server_name,
329329
events_to_check,
330330
redact=False,
@@ -660,7 +660,7 @@ async def do_knock(
660660
# in the invitee's sync stream. It is stripped out for all other local users.
661661
event.unsigned["knock_room_state"] = stripped_room_state["knock_state_events"]
662662

663-
context = EventContext.for_outlier(self.storage)
663+
context = EventContext.for_outlier(self._storage_controllers)
664664
stream_id = await self._federation_event_handler.persist_events_and_notify(
665665
event.room_id, [(event, context)]
666666
)
@@ -849,7 +849,7 @@ async def on_invite_request(
849849
)
850850
)
851851

852-
context = EventContext.for_outlier(self.storage)
852+
context = EventContext.for_outlier(self._storage_controllers)
853853
await self._federation_event_handler.persist_events_and_notify(
854854
event.room_id, [(event, context)]
855855
)
@@ -878,7 +878,7 @@ async def do_remotely_reject_invite(
878878

879879
await self.federation_client.send_leave(host_list, event)
880880

881-
context = EventContext.for_outlier(self.storage)
881+
context = EventContext.for_outlier(self._storage_controllers)
882882
stream_id = await self._federation_event_handler.persist_events_and_notify(
883883
event.room_id, [(event, context)]
884884
)
@@ -1027,7 +1027,7 @@ async def get_state_ids_for_pdu(self, room_id: str, event_id: str) -> List[str]:
10271027
if event.internal_metadata.outlier:
10281028
raise NotFoundError("State not known at event %s" % (event_id,))
10291029

1030-
state_groups = await self.state_storage.get_state_groups_ids(
1030+
state_groups = await self._state_storage_controller.get_state_groups_ids(
10311031
room_id, [event_id]
10321032
)
10331033

@@ -1078,7 +1078,9 @@ async def on_backfill_request(
10781078
],
10791079
)
10801080

1081-
events = await filter_events_for_server(self.storage, origin, events)
1081+
events = await filter_events_for_server(
1082+
self._storage_controllers, origin, events
1083+
)
10821084

10831085
return events
10841086

@@ -1109,7 +1111,9 @@ async def get_persisted_pdu(
11091111
if not in_room:
11101112
raise AuthError(403, "Host not in room.")
11111113

1112-
events = await filter_events_for_server(self.storage, origin, [event])
1114+
events = await filter_events_for_server(
1115+
self._storage_controllers, origin, [event]
1116+
)
11131117
event = events[0]
11141118
return event
11151119
else:
@@ -1138,7 +1142,7 @@ async def on_get_missing_events(
11381142
)
11391143

11401144
missing_events = await filter_events_for_server(
1141-
self.storage, origin, missing_events
1145+
self._storage_controllers, origin, missing_events
11421146
)
11431147

11441148
return missing_events
@@ -1480,9 +1484,11 @@ async def _sync_partial_state_room(
14801484
# clear the lazy-loading flag.
14811485
logger.info("Updating current state for %s", room_id)
14821486
assert (
1483-
self.storage.persistence is not None
1487+
self._storage_controllers.persistence is not None
14841488
), "TODO(faster_joins): support for workers"
1485-
await self.storage.persistence.update_current_state(room_id)
1489+
await self._storage_controllers.persistence.update_current_state(
1490+
room_id
1491+
)
14861492

14871493
logger.info("Clearing partial-state flag for %s", room_id)
14881494
success = await self.store.clear_partial_state_room(room_id)

synapse/handlers/federation_event.py

+17-10
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ class FederationEventHandler:
9898

9999
def __init__(self, hs: "HomeServer"):
100100
self._store = hs.get_datastores().main
101-
self._storage = hs.get_storage()
102-
self._state_storage = self._storage.state
101+
self._storage_controllers = hs.get_storage_controllers()
102+
self._state_storage_controller = self._storage_controllers.state
103103

104104
self._state_handler = hs.get_state_handler()
105105
self._event_creation_handler = hs.get_event_creation_handler()
@@ -535,7 +535,9 @@ async def update_state_for_partial_state_event(
535535
)
536536
return
537537
await self._store.update_state_for_partial_state_event(event, context)
538-
self._state_storage.notify_event_un_partial_stated(event.event_id)
538+
self._state_storage_controller.notify_event_un_partial_stated(
539+
event.event_id
540+
)
539541

540542
async def backfill(
541543
self, dest: str, room_id: str, limit: int, extremities: Collection[str]
@@ -835,7 +837,9 @@ async def _resolve_state_at_missing_prevs(
835837

836838
try:
837839
# Get the state of the events we know about
838-
ours = await self._state_storage.get_state_groups_ids(room_id, seen)
840+
ours = await self._state_storage_controller.get_state_groups_ids(
841+
room_id, seen
842+
)
839843

840844
# state_maps is a list of mappings from (type, state_key) to event_id
841845
state_maps: List[StateMap[str]] = list(ours.values())
@@ -1436,7 +1440,7 @@ def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]:
14361440
# we're not bothering about room state, so flag the event as an outlier.
14371441
event.internal_metadata.outlier = True
14381442

1439-
context = EventContext.for_outlier(self._storage)
1443+
context = EventContext.for_outlier(self._storage_controllers)
14401444
try:
14411445
validate_event_for_room_version(room_version_obj, event)
14421446
check_auth_rules_for_event(room_version_obj, event, auth)
@@ -1613,7 +1617,7 @@ async def _check_for_soft_fail(
16131617
# given state at the event. This should correctly handle cases
16141618
# like bans, especially with state res v2.
16151619

1616-
state_sets_d = await self._state_storage.get_state_groups_ids(
1620+
state_sets_d = await self._state_storage_controller.get_state_groups_ids(
16171621
event.room_id, extrem_ids
16181622
)
16191623
state_sets: List[StateMap[str]] = list(state_sets_d.values())
@@ -1885,7 +1889,7 @@ async def _update_context_for_auth_events(
18851889

18861890
# create a new state group as a delta from the existing one.
18871891
prev_group = context.state_group
1888-
state_group = await self._state_storage.store_state_group(
1892+
state_group = await self._state_storage_controller.store_state_group(
18891893
event.event_id,
18901894
event.room_id,
18911895
prev_group=prev_group,
@@ -1894,7 +1898,7 @@ async def _update_context_for_auth_events(
18941898
)
18951899

18961900
return EventContext.with_state(
1897-
storage=self._storage,
1901+
storage=self._storage_controllers,
18981902
state_group=state_group,
18991903
state_group_before_event=context.state_group_before_event,
19001904
state_delta_due_to_event=state_updates,
@@ -1984,11 +1988,14 @@ async def persist_events_and_notify(
19841988
)
19851989
return result["max_stream_id"]
19861990
else:
1987-
assert self._storage.persistence
1991+
assert self._storage_controllers.persistence
19881992

19891993
# Note that this returns the events that were persisted, which may not be
19901994
# the same as were passed in if some were deduplicated due to transaction IDs.
1991-
events, max_stream_token = await self._storage.persistence.persist_events(
1995+
(
1996+
events,
1997+
max_stream_token,
1998+
) = await self._storage_controllers.persistence.persist_events(
19921999
event_and_contexts, backfilled=backfilled
19932000
)
19942001

synapse/handlers/initial_sync.py

+10-7
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ def __init__(self, hs: "HomeServer"):
6767
]
6868
] = ResponseCache(hs.get_clock(), "initial_sync_cache")
6969
self._event_serializer = hs.get_event_client_serializer()
70-
self.storage = hs.get_storage()
71-
self.state_storage = self.storage.state
70+
self._storage_controllers = hs.get_storage_controllers()
71+
self._state_storage_controller = self._storage_controllers.state
7272

7373
async def snapshot_all_rooms(
7474
self,
@@ -198,7 +198,8 @@ async def handle_room(event: RoomsForUser) -> None:
198198
event.stream_ordering,
199199
)
200200
deferred_room_state = run_in_background(
201-
self.state_storage.get_state_for_events, [event.event_id]
201+
self._state_storage_controller.get_state_for_events,
202+
[event.event_id],
202203
).addCallback(
203204
lambda states: cast(StateMap[EventBase], states[event.event_id])
204205
)
@@ -218,7 +219,7 @@ async def handle_room(event: RoomsForUser) -> None:
218219
).addErrback(unwrapFirstError)
219220

220221
messages = await filter_events_for_client(
221-
self.storage, user_id, messages
222+
self._storage_controllers, user_id, messages
222223
)
223224

224225
start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)
@@ -355,7 +356,9 @@ async def _room_initial_sync_parted(
355356
member_event_id: str,
356357
is_peeking: bool,
357358
) -> JsonDict:
358-
room_state = await self.state_storage.get_state_for_event(member_event_id)
359+
room_state = await self._state_storage_controller.get_state_for_event(
360+
member_event_id
361+
)
359362

360363
limit = pagin_config.limit if pagin_config else None
361364
if limit is None:
@@ -369,7 +372,7 @@ async def _room_initial_sync_parted(
369372
)
370373

371374
messages = await filter_events_for_client(
372-
self.storage, user_id, messages, is_peeking=is_peeking
375+
self._storage_controllers, user_id, messages, is_peeking=is_peeking
373376
)
374377

375378
start_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, token)
@@ -474,7 +477,7 @@ async def get_receipts() -> List[JsonDict]:
474477
)
475478

476479
messages = await filter_events_for_client(
477-
self.storage, user_id, messages, is_peeking=is_peeking
480+
self._storage_controllers, user_id, messages, is_peeking=is_peeking
478481
)
479482

480483
start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)

0 commit comments

Comments
 (0)