Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit d85cba1

Browse files
realtyemreivilibre
andauthored
Add all Stream Writer worker types to configure_workers_and_start.py (#14197)
Co-authored-by: reivilibre <[email protected]>
1 parent 5853d79 commit d85cba1

File tree

2 files changed

+70
-7
lines changed

2 files changed

+70
-7
lines changed

changelog.d/14197.docker

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add all Stream Writer worker types to configure_workers_and_start.py.

docker/configure_workers_and_start.py

+69-7
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,12 @@
5050

5151
MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
5252

53-
53+
# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
54+
# Watching /_matrix/client needs a "client" listener
55+
# Watching /_matrix/federation needs a "federation" listener
56+
# Watching /_matrix/media and related needs a "media" listener
57+
# Stream Writers require "client" and "replication" listeners because they
58+
# have to attach by instance_map to the master process and have client endpoints.
5459
WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
5560
"pusher": {
5661
"app": "synapse.app.pusher",
@@ -209,6 +214,49 @@
209214
% (MAIN_PROCESS_HTTP_LISTENER_PORT,)
210215
),
211216
},
217+
"account_data": {
218+
"app": "synapse.app.generic_worker",
219+
"listener_resources": ["client", "replication"],
220+
"endpoint_patterns": [
221+
"^/_matrix/client/(r0|v3|unstable)/.*/tags",
222+
"^/_matrix/client/(r0|v3|unstable)/.*/account_data",
223+
],
224+
"shared_extra_conf": {},
225+
"worker_extra_conf": "",
226+
},
227+
"presence": {
228+
"app": "synapse.app.generic_worker",
229+
"listener_resources": ["client", "replication"],
230+
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"],
231+
"shared_extra_conf": {},
232+
"worker_extra_conf": "",
233+
},
234+
"receipts": {
235+
"app": "synapse.app.generic_worker",
236+
"listener_resources": ["client", "replication"],
237+
"endpoint_patterns": [
238+
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
239+
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
240+
],
241+
"shared_extra_conf": {},
242+
"worker_extra_conf": "",
243+
},
244+
"to_device": {
245+
"app": "synapse.app.generic_worker",
246+
"listener_resources": ["client", "replication"],
247+
"endpoint_patterns": ["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"],
248+
"shared_extra_conf": {},
249+
"worker_extra_conf": "",
250+
},
251+
"typing": {
252+
"app": "synapse.app.generic_worker",
253+
"listener_resources": ["client", "replication"],
254+
"endpoint_patterns": [
255+
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"
256+
],
257+
"shared_extra_conf": {},
258+
"worker_extra_conf": "",
259+
},
212260
}
213261

214262
# Templates for sections that may be inserted multiple times in config files
@@ -271,7 +319,7 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
271319
outfile.write(rendered)
272320

273321

274-
def add_sharding_to_shared_config(
322+
def add_worker_roles_to_shared_config(
275323
shared_config: dict,
276324
worker_type: str,
277325
worker_name: str,
@@ -309,6 +357,20 @@ def add_sharding_to_shared_config(
309357
"port": worker_port,
310358
}
311359

360+
elif worker_type in ["account_data", "presence", "receipts", "to_device", "typing"]:
361+
# Update the list of stream writers
362+
# It's convienent that the name of the worker type is the same as the event stream
363+
shared_config.setdefault("stream_writers", {}).setdefault(
364+
worker_type, []
365+
).append(worker_name)
366+
367+
# Map of stream writer instance names to host/ports combos
368+
# For now, all stream writers need http replication ports
369+
instance_map[worker_name] = {
370+
"host": "localhost",
371+
"port": worker_port,
372+
}
373+
312374
elif worker_type == "media_repository":
313375
# The first configured media worker will run the media background jobs
314376
shared_config.setdefault("media_instance_running_background_jobs", worker_name)
@@ -441,11 +503,11 @@ def generate_worker_files(
441503

442504
# Check if more than one instance of this worker type has been specified
443505
worker_type_total_count = worker_types.count(worker_type)
444-
if worker_type_total_count > 1:
445-
# Update the shared config with sharding-related options if necessary
446-
add_sharding_to_shared_config(
447-
shared_config, worker_type, worker_name, worker_port
448-
)
506+
507+
# Update the shared config with sharding-related options if necessary
508+
add_worker_roles_to_shared_config(
509+
shared_config, worker_type, worker_name, worker_port
510+
)
449511

450512
# Enable the worker in supervisord
451513
worker_descriptors.append(worker_config)

0 commit comments

Comments
 (0)