Skip to content

Race condition when connecting with the same websockets transport twice at the same time #105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
sneko opened this issue Jun 23, 2020 · 5 comments · Fixed by #106
Closed
Labels
good first issue First-time contributors that open a new issue type: bug An issue or pull request relating to a bug

Comments

@sneko
Copy link
Contributor

sneko commented Jun 23, 2020

Hi @leszekhanusz ,

First of all, thanks again for implementing the subscription part, that's great!

I still get some issues to set up multiple subscriptions and I'm not sure how to solve this. Consider taking your example: https://github.com/graphql-python/gql/blame/master/README.md#L318-L342

Can you confirm me that about subscriptions the asyncio.create_task(...) will immediately run the function in another thread, and that all the await taskX is to remain the program blocking until each task finishes?

On my side even with your example I get this random error (it's not immediate, sometimes after 1 second, sometimes 10...):

RuntimeError: cannot call recv while another coroutine is already waiting for the next message

The message is pretty explicit but I don't understand how to bypass this 😢

If you have any idea 👍

Thank you,

EDIT: that's weird because sometimes without modifying the code, the process can run more than 5 minutes without having this error...

EDIT2: Note that sometimes I also get this error about the subscribe(...) method

    async for r in self.ws_client.subscribe(subscriptions['scanProbesRequested']):
TypeError: 'async for' requires an object with __aiter__ method, got generator

EDIT3: If I use a different way of doing async (with the same library)

try:
        loop = asyncio.get_event_loop()
        task3 = loop.create_task(execute_subscription1())
        task4 = loop.create_task(execute_subscription2())
        loop.run_forever()

it works without any error. That's really strange...

@leszekhanusz
Copy link
Collaborator

leszekhanusz commented Jun 23, 2020

Hi,

Could you please post your code ? The parts where the transports, clients and session are created.

Regarding your EDIT2, the subscribe method on the client is a normal python generator (not an async generator) so you can't use it with async for (a normal for is needed but it is then synchronous and you can't use this if an event loop is running)

So you actually need to use async for on the session.subscribe, not on the client.subscribe

You can get a session from the client by running

async with client as session:
    # session is available here

Can you confirm me that about subscriptions the asyncio.create_task(...) will immediately run the function in another thread, and that all the await taskX is to remain the program blocking until each task finishes?

That's the spirit except that no Threads are created. The create_task will add the task to the event loop and this task will be run as soon as possible by the event loop. the await will block the coroutine there until the task finishes.

EDIT:
An example of two queries in parallel with asyncio.gather is present in the tests:
tests/test_websocket_query.py#L194

@sneko
Copy link
Contributor Author

sneko commented Jun 23, 2020

Thanks for your answer @leszekhanusz , I succeeded in making it working, below the before/after cases:

Old way with the issue


class API:
  def __init__(self, ws_url):
    ws_transport=WebsocketsTransport(
        url=ws_url,
        init_payload={}
    )

    self.ws_client = Client(
        transport=ws_transport,
    )

  async def subscribe_scan_probes_requested(self, callback):
    print("scan_probes_requested")
    async with self.ws_client as session:
      async for r in self.ws_client.subscribe(subscriptions['scanProbesRequested']):
        callback(r)

  async def subscribe_scenario_stopped(self, callback):
    print("scenario_stopped")
    async with self.ws_client as session:
      async for r in session.subscribe(subscriptions['scenarioStopped']):
        callback(r)
async def main():
    api = API(ws_url=config.module_api_ws_addr)

    task1 = asyncio.create_task(api.subscribe_scan_probes_requested(scan_probes_requested_callback))
    task2 = asyncio.create_task(api.subscribe_scenario_stopped(scenario_stopped_callback))

    await task1
    await task2

asyncio.run(main())

New way that works

class API:
  def __init__(self, ws_url):
    ws_transport=WebsocketsTransport(
        url=ws_url,
        init_payload={}
    )

    self.ws_client = Client(
        transport=ws_transport,
    )

  async def subscribe_scan_probes_requested(self, session, callback):
    print("scan_probes_requested")
    async for r in session.subscribe(subscriptions['scanProbesRequested']):
      callback(r)

  async def subscribe_scenario_stopped(self, session, callback):
    print("scenario_stopped")
    async for r in session.subscribe(subscriptions['scenarioStopped']):
      callback(r)
async def main():
    api = API(ws_url=config.module_api_ws_addr)

    async with api.ws_client as session:
        task1 = asyncio.create_task(api.subscribe_scan_probes_requested(session, scan_probes_requested_callback))
        task2 = asyncio.create_task(api.subscribe_scenario_stopped(session, scenario_stopped_callback))

        await task1
        await task2

asyncio.run(main())

Conclusion/Differences

The only difference is that I "factorize" the definition of the session variable and by doing this there is no longer conflict with recv().

At first looking it seems the code should behave the same: session being kind of an async alias... but behind the hood I don't know how asyncio library deal with that and it seems to be the origin of the conflict.

Does it seems right to you now?

Another quick question, I would like to force the auto-reconnect if the connection closes or something wrong happens. Do I have to deal with it manually with another for ... above the async for subscribe() but also by wrapping the websocket client to reconnect?

I see you specified a connect_args that is merged to params before giving it to websockets.connect() (https://websockets.readthedocs.io/en/stable/api.html#module-websockets.client) but according to their documentation https://github.com/aaugustin/websockets/blob/master/docs/faq.rst#how-do-i-create-channels-or-topics and python-websockets/websockets#414 it doesn't seem there is something implemented on their side.

If you already have an example about the reconnection with the GQL client, it would be really appreciated!

Thank you,

EDIT: I close the issue since it's solved. If you have any advice about reconnection, I'm still interested to know more about it :)

@sneko sneko closed this as completed Jun 23, 2020
@leszekhanusz
Copy link
Collaborator

leszekhanusz commented Jun 23, 2020

You figured it out.

The problem was that you used async with client as session: twice in parallel in the first version. This would cause the code to try to connect twice using the same transport. This is normally not allowed and should raise the Exception TransportAlreadyConnected

But because of a race condition in the websockets transport it tried to connect twice at the same time... This is a small bug with the transport.

Regarding the retries, I use the backoff module which allows to add a decorator to an async function. This will ensure that if a problem happens, it will retry but with a delay which is getting longer for each retry. I plan to add documentation about this:

Something like this:

@backoff.on_exception(backoff.expo,                                                                                     
                      Exception,                                                                                        
                      max_value=60)                                                                                     
async def main():
    # your connection here

EDIT:
Note that you can use asyncio.gather now if you want to reduce one line :-)

@leszekhanusz leszekhanusz reopened this Jun 23, 2020
@leszekhanusz leszekhanusz added the type: bug An issue or pull request relating to a bug label Jun 23, 2020
@leszekhanusz leszekhanusz changed the title Issue to subscribe in multiple tasks Race condition when connecting with the same websockets transport twice at the same time Jun 23, 2020
@sneko
Copy link
Contributor Author

sneko commented Jun 23, 2020

Indeed having a retry example in the documentation will help 👍

I'm trying to set up it right now, I already did that in Javascript to enable auto-reconnect and it was not that trivial 😄

With the Python library and according to asyncio, I think I need to wrap the websocket initialization but also all my subscribe calls into a @backoff decorator. What do you think? Something like:

@backoff.on_exception(backoff.expo, Exception, max_time=60)
async def main():
    try:
        api = API(ws_url=config.module_api_ws_addr)
        
        async with api.ws_client as session:
            task1 = asyncio.create_task(api.subscribe_scan_probes_requested(session, scan_probes_requested_callback))
            task2 = asyncio.create_task(api.subscribe_scenario_stopped(session, scenario_stopped_callback))

            await task1
            await task2
    except as e:
        # Properly end all async process before the backoff retry
        task1.cancel()
        task2.cancel()

        raise e

asyncio.run(main())

(In Javascript with my Apollo client I just had to manage the websocket "subclient" to reconnect without dealing with re-subscribing to all subscriptions. I guess the library was dealing with it automatically)

But since with Python the multiple subscribe() are in specific threads based on a specific session variable, I didn't see a way each async for session.subscribe() will keep working after a re-init of the underlying websocket client.

Am I right?

Thank you again (and sorry for mixing both questions)

@leszekhanusz
Copy link
Collaborator

Fixed in version v3.0.0a1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good first issue First-time contributors that open a new issue type: bug An issue or pull request relating to a bug
Projects
None yet
3 participants