|
35 | 35 |
|
36 | 36 | from synapse.metrics import LaterGauge
|
37 | 37 | from synapse.metrics.background_process_metrics import run_as_background_process
|
38 |
| -from synapse.replication.tcp.client import DirectTcpReplicationClientFactory |
39 | 38 | from synapse.replication.tcp.commands import (
|
40 | 39 | ClearUserSyncsCommand,
|
41 | 40 | Command,
|
@@ -332,46 +331,31 @@ async def _process_command(
|
332 | 331 |
|
333 | 332 | def start_replication(self, hs: "HomeServer") -> None:
|
334 | 333 | """Helper method to start replication."""
|
335 |
| - if hs.config.redis.redis_enabled: |
336 |
| - from synapse.replication.tcp.redis import ( |
337 |
| - RedisDirectTcpReplicationClientFactory, |
338 |
| - ) |
| 334 | + from synapse.replication.tcp.redis import RedisDirectTcpReplicationClientFactory |
339 | 335 |
|
340 |
| - # First let's ensure that we have a ReplicationStreamer started. |
341 |
| - hs.get_replication_streamer() |
| 336 | + # First let's ensure that we have a ReplicationStreamer started. |
| 337 | + hs.get_replication_streamer() |
342 | 338 |
|
343 |
| - # We need two connections to redis, one for the subscription stream and |
344 |
| - # one to send commands to (as you can't send further redis commands to a |
345 |
| - # connection after SUBSCRIBE is called). |
| 339 | + # We need two connections to redis, one for the subscription stream and |
| 340 | + # one to send commands to (as you can't send further redis commands to a |
| 341 | + # connection after SUBSCRIBE is called). |
346 | 342 |
|
347 |
| - # First create the connection for sending commands. |
348 |
| - outbound_redis_connection = hs.get_outbound_redis_connection() |
| 343 | + # First create the connection for sending commands. |
| 344 | + outbound_redis_connection = hs.get_outbound_redis_connection() |
349 | 345 |
|
350 |
| - # Now create the factory/connection for the subscription stream. |
351 |
| - self._factory = RedisDirectTcpReplicationClientFactory( |
352 |
| - hs, |
353 |
| - outbound_redis_connection, |
354 |
| - channel_names=self._channels_to_subscribe_to, |
355 |
| - ) |
356 |
| - hs.get_reactor().connectTCP( |
357 |
| - hs.config.redis.redis_host, |
358 |
| - hs.config.redis.redis_port, |
359 |
| - self._factory, |
360 |
| - timeout=30, |
361 |
| - bindAddress=None, |
362 |
| - ) |
363 |
| - else: |
364 |
| - client_name = hs.get_instance_name() |
365 |
| - self._factory = DirectTcpReplicationClientFactory(hs, client_name, self) |
366 |
| - host = hs.config.worker.worker_replication_host |
367 |
| - port = hs.config.worker.worker_replication_port |
368 |
| - hs.get_reactor().connectTCP( |
369 |
| - host, |
370 |
| - port, |
371 |
| - self._factory, |
372 |
| - timeout=30, |
373 |
| - bindAddress=None, |
374 |
| - ) |
| 346 | + # Now create the factory/connection for the subscription stream. |
| 347 | + self._factory = RedisDirectTcpReplicationClientFactory( |
| 348 | + hs, |
| 349 | + outbound_redis_connection, |
| 350 | + channel_names=self._channels_to_subscribe_to, |
| 351 | + ) |
| 352 | + hs.get_reactor().connectTCP( |
| 353 | + hs.config.redis.redis_host, |
| 354 | + hs.config.redis.redis_port, |
| 355 | + self._factory, |
| 356 | + timeout=30, |
| 357 | + bindAddress=None, |
| 358 | + ) |
375 | 359 |
|
376 | 360 | def get_streams(self) -> Dict[str, Stream]:
|
377 | 361 | """Get a map from stream name to all streams."""
|
|
0 commit comments