|
1 | 1 | import asyncio
|
2 |
| -from inspect import isawaitable |
3 |
| -from typing import Any, AsyncGenerator, Dict, Generator, Optional, Union, cast |
| 2 | +from typing import Any, AsyncGenerator, Dict, Generator, Optional, Union |
4 | 3 |
|
5 | 4 | from graphql import (
|
6 | 5 | DocumentNode,
|
@@ -196,19 +195,22 @@ class SyncClientSession:
|
196 | 195 | def __init__(self, client: Client):
|
197 | 196 | self.client = client
|
198 | 197 |
|
199 |
| - def execute(self, document: DocumentNode, *args, **kwargs) -> Dict: |
| 198 | + def _execute(self, document: DocumentNode, *args, **kwargs) -> ExecutionResult: |
200 | 199 |
|
201 | 200 | # Validate document
|
202 | 201 | if self.client.schema:
|
203 | 202 | self.client.validate(document)
|
204 | 203 |
|
205 |
| - result = self.transport.execute(document, *args, **kwargs) |
| 204 | + return self.transport.execute(document, *args, **kwargs) |
| 205 | + |
| 206 | + def execute(self, document: DocumentNode, *args, **kwargs) -> Dict: |
206 | 207 |
|
207 |
| - assert not isawaitable(result), "Transport returned an awaitable result." |
208 |
| - result = cast(ExecutionResult, result) |
| 208 | + # Validate and execute on the transport |
| 209 | + result = self._execute(document, *args, **kwargs) |
209 | 210 |
|
| 211 | + # Raise an error if an error is returned in the ExecutionResult object |
210 | 212 | if result.errors:
|
211 |
| - raise TransportQueryError(str(result.errors[0])) |
| 213 | + raise TransportQueryError(str(result.errors[0]), errors=result.errors) |
212 | 214 |
|
213 | 215 | assert (
|
214 | 216 | result.data is not None
|
@@ -250,43 +252,69 @@ async def fetch_and_validate(self, document: DocumentNode):
|
250 | 252 | if self.client.schema:
|
251 | 253 | self.client.validate(document)
|
252 | 254 |
|
253 |
| - async def subscribe( |
| 255 | + async def _subscribe( |
254 | 256 | self, document: DocumentNode, *args, **kwargs
|
255 |
| - ) -> AsyncGenerator[Dict, None]: |
| 257 | + ) -> AsyncGenerator[ExecutionResult, None]: |
256 | 258 |
|
257 | 259 | # Fetch schema from transport if needed and validate document if possible
|
258 | 260 | await self.fetch_and_validate(document)
|
259 | 261 |
|
260 |
| - # Subscribe to the transport and yield data or raise error |
261 |
| - self._generator: AsyncGenerator[ |
| 262 | + # Subscribe to the transport |
| 263 | + inner_generator: AsyncGenerator[ |
262 | 264 | ExecutionResult, None
|
263 | 265 | ] = self.transport.subscribe(document, *args, **kwargs)
|
264 | 266 |
|
265 |
| - async for result in self._generator: |
| 267 | + # Keep a reference to the inner generator to allow the user to call aclose() |
| 268 | + # before a break if python version is too old (pypy3 py 3.6.1) |
| 269 | + self._generator = inner_generator |
| 270 | + |
| 271 | + async for result in inner_generator: |
266 | 272 | if result.errors:
|
267 | 273 | # Note: we need to run generator.aclose() here or the finally block in
|
268 | 274 | # transport.subscribe will not be reached in pypy3 (py 3.6.1)
|
269 |
| - await self._generator.aclose() |
| 275 | + await inner_generator.aclose() |
| 276 | + |
| 277 | + yield result |
| 278 | + |
| 279 | + async def subscribe( |
| 280 | + self, document: DocumentNode, *args, **kwargs |
| 281 | + ) -> AsyncGenerator[Dict, None]: |
270 | 282 |
|
271 |
| - raise TransportQueryError(str(result.errors[0])) |
| 283 | + # Validate and subscribe on the transport |
| 284 | + async for result in self._subscribe(document, *args, **kwargs): |
| 285 | + |
| 286 | + # Raise an error if an error is returned in the ExecutionResult object |
| 287 | + if result.errors: |
| 288 | + raise TransportQueryError(str(result.errors[0]), errors=result.errors) |
272 | 289 |
|
273 | 290 | elif result.data is not None:
|
274 | 291 | yield result.data
|
275 | 292 |
|
276 |
| - async def execute(self, document: DocumentNode, *args, **kwargs) -> Dict: |
| 293 | + async def _execute( |
| 294 | + self, document: DocumentNode, *args, **kwargs |
| 295 | + ) -> ExecutionResult: |
277 | 296 |
|
278 | 297 | # Fetch schema from transport if needed and validate document if possible
|
279 | 298 | await self.fetch_and_validate(document)
|
280 | 299 |
|
281 | 300 | # Execute the query with the transport with a timeout
|
282 |
| - result = await asyncio.wait_for( |
| 301 | + return await asyncio.wait_for( |
283 | 302 | self.transport.execute(document, *args, **kwargs),
|
284 | 303 | self.client.execute_timeout,
|
285 | 304 | )
|
286 | 305 |
|
| 306 | + async def execute(self, document: DocumentNode, *args, **kwargs) -> Dict: |
| 307 | + |
| 308 | + # Validate and execute on the transport |
| 309 | + result = await self._execute(document, *args, **kwargs) |
| 310 | + |
287 | 311 | # Raise an error if an error is returned in the ExecutionResult object
|
288 | 312 | if result.errors:
|
289 |
| - raise TransportQueryError(str(result.errors[0])) |
| 313 | + raise TransportQueryError(str(result.errors[0]), errors=result.errors) |
| 314 | + |
| 315 | + assert ( |
| 316 | + result.data is not None |
| 317 | + ), "Transport returned an ExecutionResult without data or errors" |
290 | 318 |
|
291 | 319 | return result.data
|
292 | 320 |
|
|
0 commit comments