From 629257cbc98f69076f7e66a92a14ae84b8cdd06d Mon Sep 17 00:00:00 2001 From: Hanusz Leszek Date: Sat, 16 May 2020 14:43:36 +0200 Subject: [PATCH] Refactor the LocalSchemaTransport into an AsyncTransport --- gql/client.py | 4 -- gql/transport/local_schema.py | 72 +++++++++++++++++++++++------ gql/transport/transport.py | 9 ++-- tests/starwars/test_subscription.py | 47 ++++++++++++++----- tests/test_client.py | 4 +- 5 files changed, 99 insertions(+), 37 deletions(-) diff --git a/gql/client.py b/gql/client.py index 56090410..518a4c3b 100644 --- a/gql/client.py +++ b/gql/client.py @@ -1,5 +1,4 @@ import asyncio -from inspect import isawaitable from typing import Any, AsyncGenerator, Dict, Generator, Optional, Union, cast from graphql import ( @@ -116,9 +115,6 @@ def execute(self, document: DocumentNode, *args, **kwargs) -> Dict: result = self.transport.execute(document, *args, **kwargs) - assert not isawaitable(result), "Transport returned an awaitable result." - result = cast(ExecutionResult, result) - if result.errors: raise TransportQueryError(str(result.errors[0])) diff --git a/gql/transport/local_schema.py b/gql/transport/local_schema.py index 4d600fc6..54d71613 100644 --- a/gql/transport/local_schema.py +++ b/gql/transport/local_schema.py @@ -1,11 +1,12 @@ -from typing import Awaitable, Union +from inspect import isawaitable +from typing import Any, AsyncGenerator, AsyncIterator, Awaitable, Coroutine, cast -from graphql import DocumentNode, ExecutionResult, GraphQLSchema, execute +from graphql import DocumentNode, ExecutionResult, GraphQLSchema, execute, subscribe -from gql.transport import Transport +from gql.transport import AsyncTransport -class LocalSchemaTransport(Transport): +class LocalSchemaTransport(AsyncTransport): """A transport for executing GraphQL queries against a local schema.""" def __init__( @@ -17,14 +18,59 @@ def __init__( """ self.schema = schema - def execute( - self, document: DocumentNode, *args, **kwargs - ) -> Union[ExecutionResult, Awaitable[ExecutionResult]]: - """Execute the given document against the configured local schema. + async def connect(self): + """No connection needed on local transport + """ + pass + + async def close(self): + """No close needed on local transport + """ + pass - :param document: GraphQL query as AST Node object. - :param args: Positional options for execute method from graphql-core. - :param kwargs: Keyword options passed to execute method from graphql-core. - :return: ExecutionResult (either as value or awaitable) + async def execute( + self, document: DocumentNode, *args, **kwargs, + ) -> ExecutionResult: + """Execute the provided document AST for on a local GraphQL Schema. """ - return execute(self.schema, document, *args, **kwargs) + + result_or_awaitable = execute(self.schema, document, *args, **kwargs) + + execution_result: ExecutionResult + + if isawaitable(result_or_awaitable): + result_or_awaitable = cast(Awaitable[ExecutionResult], result_or_awaitable) + execution_result = await result_or_awaitable + else: + result_or_awaitable = cast(ExecutionResult, result_or_awaitable) + execution_result = result_or_awaitable + + return execution_result + + async def subscribe( + self, document: DocumentNode, *args, **kwargs, + ) -> AsyncGenerator[ExecutionResult, None]: + """Send a query and receive the results using an async generator + + The query can be a graphql query, mutation or subscription + + The results are sent as an ExecutionResult object + """ + + subscribe_result = subscribe(self.schema, document, *args, **kwargs) + + if isinstance(subscribe_result, ExecutionResult): + yield ExecutionResult + + else: + # if we don't get an ExecutionResult, then we should receive + # a Coroutine returning an AsyncIterator[ExecutionResult] + + subscribe_coro = cast( + Coroutine[Any, Any, AsyncIterator[ExecutionResult]], subscribe_result + ) + + subscribe_generator = await subscribe_coro + + async for result in subscribe_generator: + yield result diff --git a/gql/transport/transport.py b/gql/transport/transport.py index ec8bf103..828a0625 100644 --- a/gql/transport/transport.py +++ b/gql/transport/transport.py @@ -1,20 +1,17 @@ import abc -from typing import Awaitable, Union from graphql import DocumentNode, ExecutionResult class Transport: @abc.abstractmethod - def execute( - self, document: DocumentNode, *args, **kwargs - ) -> Union[ExecutionResult, Awaitable[ExecutionResult]]: + def execute(self, document: DocumentNode, *args, **kwargs) -> ExecutionResult: """Execute GraphQL query. Execute the provided document AST for either a remote or local GraphQL Schema. :param document: GraphQL query as AST Node or Document object. - :return: ExecutionResult (either as value or awaitable) + :return: ExecutionResult """ raise NotImplementedError( "Any Transport subclass must implement execute method" @@ -27,4 +24,4 @@ def close(self): from it. This is currently used by the RequestsHTTPTransport transport to close the session's connection pool. """ - pass + pass # pragma: no cover diff --git a/tests/starwars/test_subscription.py b/tests/starwars/test_subscription.py index 00bccedb..73205075 100644 --- a/tests/starwars/test_subscription.py +++ b/tests/starwars/test_subscription.py @@ -1,11 +1,21 @@ import pytest from graphql import subscribe -from gql import gql +from gql import Client, gql from .fixtures import reviews from .schema import StarWarsSchema +subscription_str = """ + subscription ListenEpisodeReviews($ep: Episode!) { + reviewAdded(episode: $ep) { + stars, + commentary, + episode + } + } +""" + @pytest.mark.asyncio async def test_subscription_support(): @@ -15,17 +25,8 @@ async def test_subscription_support(): {"stars": 5, "commentary": "This is a great movie!", "episode": 6}, ] - subs = gql( - """ - subscription ListenEpisodeReviews($ep: Episode!) { - reviewAdded(episode: $ep) { - stars, - commentary, - episode - } - } - """ - ) + subs = gql(subscription_str) + params = {"ep": "JEDI"} expected = [{**review, "episode": "JEDI"} for review in reviews[6]] @@ -34,3 +35,25 @@ async def test_subscription_support(): result = [result.data["reviewAdded"] async for result in ai] assert result == expected + + +@pytest.mark.asyncio +async def test_subscription_support_using_client(): + # reset review data for this test + reviews[6] = [ + {"stars": 3, "commentary": "Was expecting more stuff", "episode": 6}, + {"stars": 5, "commentary": "This is a great movie!", "episode": 6}, + ] + + subs = gql(subscription_str) + + params = {"ep": "JEDI"} + expected = [{**review, "episode": "JEDI"} for review in reviews[6]] + + results = [] + + async with Client(schema=StarWarsSchema) as session: + async for result in session.subscribe(subs, variable_values=params): + results.append(result["reviewAdded"]) + + assert results == expected diff --git a/tests/test_client.py b/tests/test_client.py index bbd95805..e6616ca9 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -171,6 +171,6 @@ def test_gql(): """ ) - with Client(schema=schema) as client: - result = client.execute(query) + client = Client(schema=schema) + result = client.execute(query) assert result["user"] is None