Skip to content

Commit 0ebe9cb

Browse files
authored
Merge pull request #1329 from absinthe-graphql/bryanjos/unbounded_concurrency_fix
Add async option to Absinthe.Subscription
2 parents 0c5d315 + 83742f6 commit 0ebe9cb

File tree

4 files changed

+37
-17
lines changed

4 files changed

+37
-17
lines changed

CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Changelog
22

3+
## Unreleased
4+
- Feature: [Add async option to Absinthe.Subscription](https://github.com/absinthe-graphql/absinthe/pull/1329)
5+
- Bug Fix: [Avoid table scans on registry](https://github.com/absinthe-graphql/absinthe/pull/1330)
6+
- Big Fix: [Unregsiter duplicate (listening to the same topic) subscriptions individually](https://github.com/absinthe-graphql/absinthe/pull/1336)
7+
38
## 1.7.8
49

510
- Bugfix: Fixes an issue where schemas would not find their types, or not be found at all.

lib/absinthe/subscription/proxy.ex

+18-10
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ defmodule Absinthe.Subscription.Proxy do
66
defstruct [
77
:pubsub,
88
:node,
9-
:task_super
9+
:task_super,
10+
:async
1011
]
1112

12-
def child_spec([_, _, shard] = args) do
13+
def child_spec([_task_super, _pubsub, shard, _async] = args) do
1314
%{
1415
id: {__MODULE__, shard},
1516
start: {__MODULE__, :start_link, [args]}
@@ -26,11 +27,11 @@ defmodule Absinthe.Subscription.Proxy do
2627

2728
def topic(shard), do: "__absinthe__:proxy:#{shard}"
2829

29-
def init([task_super, pubsub, shard]) do
30+
def init([task_super, pubsub, shard, async]) do
3031
node_name = pubsub.node_name()
3132
:ok = pubsub.subscribe(topic(shard))
3233
Process.send_after(self(), :gc, @gc_interval)
33-
{:ok, %__MODULE__{pubsub: pubsub, node: node_name, task_super: task_super}}
34+
{:ok, %__MODULE__{pubsub: pubsub, node: node_name, task_super: task_super, async: async}}
3435
end
3536

3637
def handle_info(:gc, state) do
@@ -42,13 +43,20 @@ defmodule Absinthe.Subscription.Proxy do
4243
def handle_info(payload, state) do
4344
# There's no meaningful form of backpressure to have here, and we can't
4445
# bottleneck execution inside each proxy process
45-
4646
unless payload.node == state.pubsub.node_name() do
47-
Task.Supervisor.start_child(state.task_super, Subscription.Local, :publish_mutation, [
48-
state.pubsub,
49-
payload.mutation_result,
50-
payload.subscribed_fields
51-
])
47+
if state.async do
48+
Task.Supervisor.start_child(state.task_super, Subscription.Local, :publish_mutation, [
49+
state.pubsub,
50+
payload.mutation_result,
51+
payload.subscribed_fields
52+
])
53+
else
54+
Subscription.Local.publish_mutation(
55+
state.pubsub,
56+
payload.mutation_result,
57+
payload.subscribed_fields
58+
)
59+
end
5260
end
5361

5462
{:noreply, state}

lib/absinthe/subscription/proxy_supervisor.ex

+4-4
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,18 @@ defmodule Absinthe.Subscription.ProxySupervisor do
33

44
use Supervisor
55

6-
def start_link([pubsub, registry, pool_size]) do
7-
Supervisor.start_link(__MODULE__, {pubsub, registry, pool_size})
6+
def start_link([pubsub, registry, pool_size, async]) do
7+
Supervisor.start_link(__MODULE__, {pubsub, registry, pool_size, async})
88
end
99

10-
def init({pubsub, registry, pool_size}) do
10+
def init({pubsub, registry, pool_size, async}) do
1111
task_super_name = Module.concat(registry, TaskSuper)
1212
task_super = {Task.Supervisor, name: task_super_name}
1313

1414
# Shard numbers are generated by phash2 which is 0-based:
1515
proxies =
1616
for shard <- 0..(pool_size - 1) do
17-
{Absinthe.Subscription.Proxy, [task_super_name, pubsub, shard]}
17+
{Absinthe.Subscription.Proxy, [task_super_name, pubsub, shard, async]}
1818
end
1919

2020
Supervisor.init([task_super | proxies], strategy: :one_for_one)

lib/absinthe/subscription/supervisor.ex

+10-3
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,17 @@ defmodule Absinthe.Subscription.Supervisor do
2424
pool_size = Keyword.get(opts, :pool_size, System.schedulers_online() * 2)
2525
compress_registry? = Keyword.get(opts, :compress_registry?, true)
2626

27-
Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?})
27+
# Absinthe.Subscription.Proxy listens for subscription messages
28+
# from other nodes and then runs Subscription.Local.publish_mutation to process
29+
# the mutation on the local node. By default it runs in a task superivsor so that
30+
# requests are handled concurrently. However, this may not work for some
31+
# systems. Setting `async` to false makes it so that the requests are processed one at a time.
32+
async? = Keyword.get(opts, :async, true)
33+
34+
Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?, async?})
2835
end
2936

30-
def init({pubsub, pool_size, compress_registry?}) do
37+
def init({pubsub, pool_size, compress_registry?, async?}) do
3138
registry_name = Absinthe.Subscription.registry_name(pubsub)
3239
meta = [pool_size: pool_size]
3340

@@ -40,7 +47,7 @@ defmodule Absinthe.Subscription.Supervisor do
4047
meta: meta,
4148
compressed: compress_registry?
4249
]},
43-
{Absinthe.Subscription.ProxySupervisor, [pubsub, registry_name, pool_size]}
50+
{Absinthe.Subscription.ProxySupervisor, [pubsub, registry_name, pool_size, async?]}
4451
]
4552

4653
Supervisor.init(children, strategy: :one_for_one)

0 commit comments

Comments
 (0)