Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 33702fa

Browse files
committed
Restructure the way we subscribe to channels
1 parent cfa55ed commit 33702fa

File tree

2 files changed

+30
-16
lines changed

2 files changed

+30
-16
lines changed

synapse/replication/tcp/handler.py

+30-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Copyright 2017 Vector Creations Ltd
2-
# Copyright 2020 The Matrix.org Foundation C.I.C.
2+
# Copyright 2020, 2022 The Matrix.org Foundation C.I.C.
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License");
55
# you may not use this file except in compliance with the License.
@@ -101,6 +101,9 @@ def __init__(self, hs: "HomeServer"):
101101
self._instance_id = hs.get_instance_id()
102102
self._instance_name = hs.get_instance_name()
103103

104+
# Additional Redis channel suffixes to subscribe to.
105+
self._channels_to_subscribe_to: List[str] = []
106+
104107
self._is_presence_writer = (
105108
hs.get_instance_name() in hs.config.worker.writers.presence
106109
)
@@ -243,6 +246,31 @@ def __init__(self, hs: "HomeServer"):
243246
# If we're NOT using Redis, this must be handled by the master
244247
self._should_insert_client_ips = hs.get_instance_name() == "master"
245248

249+
if self._is_master or self._should_insert_client_ips:
250+
self.subscribe_to_channel("USER_IP")
251+
252+
def subscribe_to_channel(self, channel_name: str) -> None:
253+
"""
254+
Indicates that we wish to subscribe to a Redis channel by name.
255+
256+
(The name will later be prefixed with the server name; i.e. subscribing
257+
to the 'ABC' channel actually subscribes to 'example.com/ABC' Redis-side.)
258+
259+
Raises:
260+
- If replication has already started, then it's too late to subscribe
261+
to new channels.
262+
"""
263+
264+
if self._factory is not None:
265+
# We don't allow subscribing after the fact to avoid the chance
266+
# of missing an important message because we didn't subscribe in time.
267+
raise RuntimeError(
268+
"Cannot subscribe to more channels after replication started."
269+
)
270+
271+
if channel_name not in self._channels_to_subscribe_to:
272+
self._channels_to_subscribe_to.append(channel_name)
273+
246274
def _add_command_to_stream_queue(
247275
self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand]
248276
) -> None:
@@ -323,9 +351,7 @@ def start_replication(self, hs: "HomeServer") -> None:
323351
self._factory = RedisDirectTcpReplicationClientFactory(
324352
hs,
325353
outbound_redis_connection,
326-
channel_names=RedisDirectTcpReplicationClientFactory.channels_to_subscribe_to_for_config(
327-
hs.config
328-
),
354+
channel_names=self._channels_to_subscribe_to,
329355
)
330356
hs.get_reactor().connectTCP(
331357
hs.config.redis.redis_host,

synapse/replication/tcp/redis.py

-12
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
from twisted.internet.interfaces import IAddress, IConnector
2525
from twisted.python.failure import Failure
2626

27-
from synapse.config.homeserver import HomeServerConfig
2827
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
2928
from synapse.metrics.background_process_metrics import (
3029
BackgroundProcessLoggingContext,
@@ -345,17 +344,6 @@ def __init__(
345344

346345
self.synapse_outbound_redis_connection = outbound_redis_connection
347346

348-
@staticmethod
349-
def channels_to_subscribe_to_for_config(config: HomeServerConfig) -> List[str]:
350-
subscribe_to = []
351-
352-
if config.worker.run_background_tasks or config.worker.worker_app is None:
353-
# If we're the main process or the background worker, we want to process
354-
# User IP addresses
355-
subscribe_to.append("USER_IP")
356-
357-
return subscribe_to
358-
359347
def buildProtocol(self, addr: IAddress) -> RedisSubscriber:
360348
p = super().buildProtocol(addr)
361349
p = cast(RedisSubscriber, p)

0 commit comments

Comments
 (0)