Skip to content

Commit 3970f1c

Browse files
authored
Permanent reconnecting async session (#324)
1 parent 6c91bb5 commit 3970f1c

12 files changed

+872
-10
lines changed
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
.. _async_permanent_session:
2+
3+
Async permanent session
4+
=======================
5+
6+
Sometimes you want to have a single permanent reconnecting async session to a GraphQL backend,
7+
and that can be `difficult to manage`_ manually with the :code:`async with client as session` syntax.
8+
9+
It is now possible to have a single reconnecting session using the
10+
:meth:`connect_async <gql.Client.connect_async>` method of Client
11+
with a :code:`reconnecting=True` argument.
12+
13+
.. code-block:: python
14+
15+
# Create a session from the client which will reconnect automatically.
16+
# This session can be kept in a class for example to provide a way
17+
# to execute GraphQL queries from many different places
18+
session = await client.connect_async(reconnecting=True)
19+
20+
# You can run execute or subscribe method on this session
21+
result = await session.execute(query)
22+
23+
# When you want the connection to close (for cleanup),
24+
# you call close_async
25+
await client.close_async()
26+
27+
28+
When you use :code:`reconnecting=True`, gql will watch the exceptions generated
29+
during the execute and subscribe calls and, if it detects a TransportClosed exception
30+
(indicating that the link to the underlying transport is broken),
31+
it will try to reconnect to the backend again.
32+
33+
Retries
34+
-------
35+
36+
Connection retries
37+
^^^^^^^^^^^^^^^^^^
38+
39+
With :code:`reconnecting=True`, gql will use the `backoff`_ module to repeatedly try to connect with
40+
exponential backoff and jitter with a maximum delay of 60 seconds by default.
41+
42+
You can change the default reconnecting profile by providing your own
43+
backoff decorator to the :code:`retry_connect` argument.
44+
45+
.. code-block:: python
46+
47+
# Here wait maximum 5 minutes between connection retries
48+
retry_connect = backoff.on_exception(
49+
backoff.expo, # wait generator (here: exponential backoff)
50+
Exception, # which exceptions should cause a retry (here: everything)
51+
max_value=300, # max wait time in seconds
52+
)
53+
session = await client.connect_async(
54+
reconnecting=True,
55+
retry_connect=retry_connect,
56+
)
57+
58+
Execution retries
59+
^^^^^^^^^^^^^^^^^
60+
61+
With :code:`reconnecting=True`, by default we will also retry up to 5 times
62+
when an exception happens during an execute call (to manage a possible loss in the connection
63+
to the transport).
64+
65+
There is no retry in case of a :code:`TransportQueryError` exception as it indicates that
66+
the connection to the backend is working correctly.
67+
68+
You can change the default execute retry profile by providing your own
69+
backoff decorator to the :code:`retry_execute` argument.
70+
71+
.. code-block:: python
72+
73+
# Here Only 3 tries for execute calls
74+
retry_execute = backoff.on_exception(
75+
backoff.expo,
76+
Exception,
77+
max_tries=3,
78+
giveup=lambda e: isinstance(e, TransportQueryError),
79+
)
80+
session = await client.connect_async(
81+
reconnecting=True,
82+
retry_execute=retry_execute,
83+
)
84+
85+
If you don't want any retry on the execute calls, you can disable the retries with :code:`retry_execute=False`
86+
87+
Subscription retries
88+
^^^^^^^^^^^^^^^^^^^^
89+
90+
There is no :code:`retry_subscribe` as it is not feasible with async generators.
91+
If you want retries for your subscriptions, then you can do it yourself
92+
with backoff decorators on your methods.
93+
94+
.. code-block:: python
95+
96+
@backoff.on_exception(backoff.expo,
97+
Exception,
98+
max_tries=3,
99+
giveup=lambda e: isinstance(e, TransportQueryError))
100+
async def execute_subscription1(session):
101+
async for result in session.subscribe(subscription1):
102+
print(result)
103+
104+
FastAPI example
105+
---------------
106+
107+
.. literalinclude:: ../code_examples/fastapi_async.py
108+
109+
Console example
110+
---------------
111+
112+
.. literalinclude:: ../code_examples/console_async.py
113+
114+
.. _difficult to manage: https://github.com/graphql-python/gql/issues/179
115+
.. _backoff: https://github.com/litl/backoff

docs/advanced/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Advanced
55
:maxdepth: 2
66

77
async_advanced_usage
8+
async_permanent_session
89
logging
910
error_handling
1011
local_schema

docs/code_examples/console_async.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import asyncio
2+
import logging
3+
4+
from aioconsole import ainput
5+
6+
from gql import Client, gql
7+
from gql.transport.aiohttp import AIOHTTPTransport
8+
9+
logging.basicConfig(level=logging.INFO)
10+
11+
GET_CONTINENT_NAME = """
12+
query getContinentName ($code: ID!) {
13+
continent (code: $code) {
14+
name
15+
}
16+
}
17+
"""
18+
19+
20+
class GraphQLContinentClient:
21+
def __init__(self):
22+
self._client = Client(
23+
transport=AIOHTTPTransport(url="https://countries.trevorblades.com/")
24+
)
25+
self._session = None
26+
27+
self.get_continent_name_query = gql(GET_CONTINENT_NAME)
28+
29+
async def connect(self):
30+
self._session = await self._client.connect_async(reconnecting=True)
31+
32+
async def close(self):
33+
await self._client.close_async()
34+
35+
async def get_continent_name(self, code):
36+
params = {"code": code}
37+
38+
answer = await self._session.execute(
39+
self.get_continent_name_query, variable_values=params
40+
)
41+
42+
return answer.get("continent").get("name")
43+
44+
45+
async def main():
46+
continent_client = GraphQLContinentClient()
47+
48+
continent_codes = ["AF", "AN", "AS", "EU", "NA", "OC", "SA"]
49+
50+
await continent_client.connect()
51+
52+
while True:
53+
54+
answer = await ainput("\nPlease enter a continent code or 'exit':")
55+
answer = answer.strip()
56+
57+
if answer == "exit":
58+
break
59+
elif answer in continent_codes:
60+
61+
try:
62+
continent_name = await continent_client.get_continent_name(answer)
63+
print(f"The continent name is {continent_name}\n")
64+
except Exception as exc:
65+
print(f"Received exception {exc} while trying to get continent name")
66+
67+
else:
68+
print(f"Please enter a valid continent code from {continent_codes}")
69+
70+
await continent_client.close()
71+
72+
73+
asyncio.run(main())

docs/code_examples/fastapi_async.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# First install fastapi and uvicorn:
2+
#
3+
# pip install fastapi uvicorn
4+
#
5+
# then run:
6+
#
7+
# uvicorn fastapi_async:app --reload
8+
9+
import logging
10+
11+
from fastapi import FastAPI, HTTPException
12+
from fastapi.responses import HTMLResponse
13+
14+
from gql import Client, gql
15+
from gql.transport.aiohttp import AIOHTTPTransport
16+
17+
logging.basicConfig(level=logging.DEBUG)
18+
log = logging.getLogger(__name__)
19+
20+
transport = AIOHTTPTransport(url="https://countries.trevorblades.com/graphql")
21+
22+
client = Client(transport=transport)
23+
24+
query = gql(
25+
"""
26+
query getContinentInfo($code: ID!) {
27+
continent(code:$code) {
28+
name
29+
code
30+
countries {
31+
name
32+
capital
33+
}
34+
}
35+
}
36+
"""
37+
)
38+
39+
app = FastAPI()
40+
41+
42+
@app.on_event("startup")
43+
async def startup_event():
44+
print("Connecting to GraphQL backend")
45+
46+
await client.connect_async(reconnecting=True)
47+
print("End of startup")
48+
49+
50+
@app.on_event("shutdown")
51+
async def shutdown_event():
52+
print("Shutting down GraphQL permanent connection...")
53+
await client.close_async()
54+
print("Shutting down GraphQL permanent connection... done")
55+
56+
57+
continent_codes = [
58+
"AF",
59+
"AN",
60+
"AS",
61+
"EU",
62+
"NA",
63+
"OC",
64+
"SA",
65+
]
66+
67+
68+
@app.get("/", response_class=HTMLResponse)
69+
def get_root():
70+
71+
continent_links = ", ".join(
72+
[f'<a href="continent/{code}">{code}</a>' for code in continent_codes]
73+
)
74+
75+
return f"""
76+
<html>
77+
<head>
78+
<title>Continents</title>
79+
</head>
80+
<body>
81+
Continents: {continent_links}
82+
</body>
83+
</html>
84+
"""
85+
86+
87+
@app.get("/continent/{continent_code}")
88+
async def get_continent(continent_code):
89+
90+
if continent_code not in continent_codes:
91+
raise HTTPException(status_code=404, detail="Continent not found")
92+
93+
try:
94+
result = await client.session.execute(
95+
query, variable_values={"code": continent_code}
96+
)
97+
except Exception as e:
98+
log.debug(f"get_continent Error: {e}")
99+
raise HTTPException(status_code=503, detail="GraphQL backend unavailable")
100+
101+
return result
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import asyncio
2+
import logging
3+
4+
import backoff
5+
6+
from gql import Client, gql
7+
from gql.transport.aiohttp import AIOHTTPTransport
8+
9+
logging.basicConfig(level=logging.INFO)
10+
11+
12+
async def main():
13+
14+
# Note: this example used the test backend from
15+
# https://github.com/slothmanxyz/typegraphql-ws-apollo
16+
transport = AIOHTTPTransport(url="ws://localhost:5000/graphql")
17+
18+
client = Client(transport=transport)
19+
20+
retry_connect = backoff.on_exception(
21+
backoff.expo,
22+
Exception,
23+
max_value=10,
24+
jitter=None,
25+
)
26+
session = await client.connect_async(reconnecting=True, retry_connect=retry_connect)
27+
28+
num = 0
29+
30+
while True:
31+
num += 1
32+
33+
# Execute single query
34+
query = gql("mutation ($message: String!) {sendMessage(message: $message)}")
35+
36+
params = {"message": f"test {num}"}
37+
38+
try:
39+
result = await session.execute(query, variable_values=params)
40+
print(result)
41+
except Exception as e:
42+
print(f"Received exception {e}")
43+
44+
await asyncio.sleep(1)
45+
46+
47+
asyncio.run(main())
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import asyncio
2+
import logging
3+
4+
import backoff
5+
6+
from gql import Client, gql
7+
from gql.transport.websockets import WebsocketsTransport
8+
9+
logging.basicConfig(level=logging.INFO)
10+
11+
12+
async def main():
13+
14+
# Note: this example used the test backend from
15+
# https://github.com/slothmanxyz/typegraphql-ws-apollo
16+
transport = WebsocketsTransport(url="ws://localhost:5000/graphql")
17+
18+
client = Client(transport=transport)
19+
20+
retry_connect = backoff.on_exception(
21+
backoff.expo,
22+
Exception,
23+
max_value=10,
24+
jitter=None,
25+
)
26+
session = await client.connect_async(reconnecting=True, retry_connect=retry_connect)
27+
28+
num = 0
29+
30+
while True:
31+
num += 1
32+
33+
# Execute single query
34+
query = gql("mutation ($message: String!) {sendMessage(message: $message)}")
35+
36+
params = {"message": f"test {num}"}
37+
38+
try:
39+
result = await session.execute(query, variable_values=params)
40+
print(result)
41+
except Exception as e:
42+
print(f"Received exception {e}")
43+
44+
await asyncio.sleep(1)
45+
46+
47+
asyncio.run(main())

0 commit comments

Comments
 (0)