Skip to content

Commit 4d2f7e7

Browse files
authored
Merge pull request #184 from ciscorn/stop-asyncgen
Cancel asyncio iteration task on unsubscription
2 parents 3a92b04 + ffd3b98 commit 4d2f7e7

File tree

2 files changed

+18
-120
lines changed

2 files changed

+18
-120
lines changed

graphql/execution/executors/asyncio.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,5 +62,5 @@ def execute(self, fn, *args, **kwargs):
6262
self.futures.append(future)
6363
return Promise.resolve(future)
6464
elif isasyncgen(result):
65-
return asyncgen_to_observable(result)
65+
return asyncgen_to_observable(result, loop=self.loop)
6666
return result
+17-119
Original file line numberDiff line numberDiff line change
@@ -1,135 +1,33 @@
11
from inspect import isasyncgen
2-
from asyncio import ensure_future
3-
from rx import Observable, AnonymousObserver
4-
from rx.core import ObservableBase, Disposable, ObserverBase
2+
from asyncio import ensure_future, wait, CancelledError
3+
from rx import AnonymousObservable
54

6-
from rx.concurrency import current_thread_scheduler
75

8-
from rx.core import Observer, Observable, Disposable
9-
from rx.core.anonymousobserver import AnonymousObserver
10-
from rx.core.autodetachobserver import AutoDetachObserver
11-
12-
13-
# class AsyncgenDisposable(Disposable):
14-
# """Represents a Disposable that disposes the asyncgen automatically."""
15-
16-
# def __init__(self, asyncgen):
17-
# """Initializes a new instance of the AsyncgenDisposable class."""
18-
19-
# self.asyncgen = asyncgen
20-
# self.is_disposed = False
21-
22-
# super(AsyncgenDisposable, self).__init__()
23-
24-
# def dispose(self):
25-
# """Sets the status to disposed"""
26-
# self.asyncgen.aclose()
27-
# self.is_disposed = True
28-
29-
30-
class AsyncgenObserver(AutoDetachObserver):
31-
def __init__(self, asyncgen, *args, **kwargs):
32-
self._asyncgen = asyncgen
33-
self.is_disposed = False
34-
super(AsyncgenObserver, self).__init__(*args, **kwargs)
35-
36-
async def dispose_asyncgen(self):
37-
if self.is_disposed:
38-
return
39-
40-
try:
41-
# await self._asyncgen.aclose()
42-
await self._asyncgen.athrow(StopAsyncIteration)
43-
self.is_disposed = True
44-
except:
45-
pass
46-
47-
def dispose(self):
48-
if self.is_disposed:
49-
return
50-
disposed = super(AsyncgenObserver, self).dispose()
51-
# print("DISPOSE observer!", disposed)
52-
ensure_future(self.dispose_asyncgen())
53-
54-
55-
class AsyncgenObservable(ObservableBase):
56-
"""Class to create an Observable instance from a delegate-based
57-
implementation of the Subscribe method."""
58-
59-
def __init__(self, subscribe, asyncgen):
60-
"""Creates an observable sequence object from the specified
61-
subscription function.
62-
63-
Keyword arguments:
64-
:param types.FunctionType subscribe: Subscribe method implementation.
65-
"""
66-
67-
self._subscribe = subscribe
68-
self._asyncgen = asyncgen
69-
super(AsyncgenObservable, self).__init__()
70-
71-
def _subscribe_core(self, observer):
72-
# print("GET SUBSCRIBER", observer)
73-
return self._subscribe(observer)
74-
# print("SUBSCRIBER RESULT", subscriber)
75-
# return subscriber
76-
77-
def subscribe(self, on_next=None, on_error=None, on_completed=None, observer=None):
78-
79-
if isinstance(on_next, Observer):
80-
observer = on_next
81-
elif hasattr(on_next, "on_next") and callable(on_next.on_next):
82-
observer = on_next
83-
elif not observer:
84-
observer = AnonymousObserver(on_next, on_error, on_completed)
85-
86-
auto_detach_observer = AsyncgenObserver(self._asyncgen, observer)
87-
88-
def fix_subscriber(subscriber):
89-
"""Fixes subscriber to make sure it returns a Disposable instead
90-
of None or a dispose function"""
91-
92-
if not hasattr(subscriber, "dispose"):
93-
subscriber = Disposable.create(subscriber)
94-
95-
return subscriber
96-
97-
def set_disposable(scheduler=None, value=None):
98-
try:
99-
subscriber = self._subscribe_core(auto_detach_observer)
100-
except Exception as ex:
101-
if not auto_detach_observer.fail(ex):
102-
raise
103-
else:
104-
auto_detach_observer.disposable = fix_subscriber(subscriber)
6+
def asyncgen_to_observable(asyncgen, loop=None):
7+
def emit(observer):
8+
task = ensure_future(
9+
iterate_asyncgen(asyncgen, observer),
10+
loop=loop)
10511

106-
# Subscribe needs to set up the trampoline before for subscribing.
107-
# Actually, the first call to Subscribe creates the trampoline so
108-
# that it may assign its disposable before any observer executes
109-
# OnNext over the CurrentThreadScheduler. This enables single-
110-
# threaded cancellation
111-
# https://social.msdn.microsoft.com/Forums/en-US/eb82f593-9684-4e27-
112-
# 97b9-8b8886da5c33/whats-the-rationale-behind-how-currentthreadsche
113-
# dulerschedulerequired-behaves?forum=rx
114-
if current_thread_scheduler.schedule_required():
115-
current_thread_scheduler.schedule(set_disposable)
116-
else:
117-
set_disposable()
12+
def dispose():
13+
async def await_task():
14+
await task
11815

119-
# Hide the identity of the auto detach observer
120-
return Disposable.create(auto_detach_observer.dispose)
16+
task.cancel()
17+
ensure_future(await_task(), loop=loop)
12118

19+
return dispose
12220

123-
def asyncgen_to_observable(asyncgen):
124-
def emit(observer):
125-
ensure_future(iterate_asyncgen(asyncgen, observer))
126-
return AsyncgenObservable(emit, asyncgen)
21+
return AnonymousObservable(emit)
12722

12823

12924
async def iterate_asyncgen(asyncgen, observer):
13025
try:
13126
async for item in asyncgen:
13227
observer.on_next(item)
13328
observer.on_completed()
29+
except CancelledError:
30+
pass
13431
except Exception as e:
13532
observer.on_error(e)
33+

0 commit comments

Comments
 (0)