Skip to content

Async generators always ensure that inner generator are closed properly #230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions gql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,11 @@ async def _subscribe(
# before a break if python version is too old (pypy3 py 3.6.1)
self._generator = inner_generator

async for result in inner_generator:
if result.errors:
# Note: we need to run generator.aclose() here or the finally block in
# transport.subscribe will not be reached in pypy3 (py 3.6.1)
await inner_generator.aclose()

yield result
try:
async for result in inner_generator:
yield result
finally:
await inner_generator.aclose()

async def subscribe(
self, document: DocumentNode, *args, **kwargs
Expand All @@ -372,17 +370,24 @@ async def subscribe(

The extra arguments are passed to the transport subscribe method."""

# Validate and subscribe on the transport
async for result in self._subscribe(document, *args, **kwargs):

# Raise an error if an error is returned in the ExecutionResult object
if result.errors:
raise TransportQueryError(
str(result.errors[0]), errors=result.errors, data=result.data
)
inner_generator: AsyncGenerator[ExecutionResult, None] = self._subscribe(
document, *args, **kwargs
)

elif result.data is not None:
yield result.data
try:
# Validate and subscribe on the transport
async for result in inner_generator:

# Raise an error if an error is returned in the ExecutionResult object
if result.errors:
raise TransportQueryError(
str(result.errors[0]), errors=result.errors, data=result.data
)

elif result.data is not None:
yield result.data
finally:
await inner_generator.aclose()

async def _execute(
self, document: DocumentNode, *args, **kwargs
Expand Down
3 changes: 2 additions & 1 deletion tests/test_websocket_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ async def test_websocket_subscription_break(

if count <= 5:
# Note: the following line is only necessary for pypy3 v3.6.1
await session._generator.aclose()
if sys.version_info < (3, 7):
await session._generator.aclose()
break

count -= 1
Expand Down