Skip to content

Document how to wait on multiple futures #48

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
aaugustin opened this issue Mar 29, 2015 · 10 comments
Closed

Document how to wait on multiple futures #48

aaugustin opened this issue Mar 29, 2015 · 10 comments

Comments

@aaugustin
Copy link
Member

As requested here: edf3a32#commitcomment-10455621

@Natim
Copy link

Natim commented Apr 4, 2015

@dmwyatt I'd like to help you on this, could you give me an example of what you'd like to do for instance?

What should we add to this example:

#!/usr/bin/env python

import asyncio
import websockets

@asyncio.coroutine
def hello(websocket, path):
    name = yield from websocket.recv()
    print("< {}".format(name))
    greeting = "Hello {}!".format(name)
    yield from websocket.send(greeting)
    print("> {}".format(greeting))

start_server = websockets.serve(hello, 'localhost', 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

@Natim
Copy link

Natim commented Apr 4, 2015

What you'd like to do is to be able to wait on both websocket.recv() and producer?

If this is the case, I just did something like that:

#!/usr/bin/env python

import asyncio
import aioredis
import websockets


@asyncio.coroutine
def simulate_web_task():
    redis = yield from aioredis.create_redis(('localhost', 6379))
    print("$ Web server sleeps")
    yield from asyncio.sleep(1)
    print("$ Web server add task hello")
    yield from redis.lpush("tasks", "hello")
    yield from asyncio.sleep(2)


@asyncio.coroutine
def simulate_client():
    websocket = yield from websockets.connect('ws://localhost:8765')
    print("> Client sleeps")
    yield from asyncio.sleep(2)
    print("> Client sends Hello")
    yield from websocket.send("Hello")
    yield from asyncio.sleep(2)


@asyncio.coroutine
def server_handler(websocket, path):
    redis = yield from aioredis.create_redis(('localhost', 6379))

    client_task = websocket.recv()
    web_task = redis.blpop("tasks")

    print("# Create client and web tasks")
    pending = {client_task, web_task}
    counter = 0

    while websocket.open and counter < 2:
        done, pending = yield from asyncio.wait(
            pending, return_when=asyncio.FIRST_COMPLETED)

        for task in done:
            if task is client_task:
                client_message = task.result()
                print("# Client says: %s" % client_message)
                client_task = websocket.recv()
                pending.add(client_task)
            elif task is web_task:
                web_message = task.result()
                print("# Web server says: %s" % web_message[1].decode('utf-8'))
                web_task = redis.blpop("tasks")
                pending.add(web_task)

            counter += 1
    print("# End of demo")

start_server = websockets.serve(server_handler, 'localhost', 8765)
asyncio.get_event_loop().run_until_complete(start_server)

asyncio.get_event_loop().run_until_complete(
    asyncio.wait({simulate_client(), simulate_web_task()})
)

@aleksandr-vin
Copy link

Correct me if I'm wrong:

client_task = websocket.recv()

should be:

client_task = asyncio.Task(websocket.recv())

@kylemacfarlane
Copy link

kylemacfarlane commented May 21, 2016

I am trying to do this and the example in the docs doesn't work. In the example when one task yields it restarts both tasks. This means any data being waited on in the slower task is lost, asyncio.sleeps are ignored, and so on.

I got it to work with the following which is more like how you'd do it with gevent. It would be simpler if websockets.connect() could be used synchronously. I think it's trying to be a bit too magic and made things harder to figure out than they could have been.

@asyncio.coroutine
def producer(websocket):
    while True:
        yield from websocket.send('Bringing you the latest news every 10 seconds!')
        yield from asyncio.sleep(10)

@asyncio.coroutine
def consumer(websocket):
    while True:
        msg = yield from websocket.recv()
        print('Received immediate update:', msg)

@asyncio.coroutine # Not actually needed if you only want one websocket client
def handler():
    websocket = yield from websockets.connect('wss://example.com')

    # Normally we would start the event loop here rather than use a while but see below...
    while True:
        yield from asyncio.wait((
            producer(websocket),
            consumer(websocket)
        ))

asyncio.get_event_loop().run_until_complete(handler()) # ... we have to start the loop out here to make websockets.connect() return something useful

@kylemacfarlane
Copy link

kylemacfarlane commented May 21, 2016

While the above code does work I can't figure out how to gracefully stop it to prevent pending task errors. To gracefully stop the loop it seems like you need access to both the loop and the tasks and of course you're not allowed to stop and start the loop from inside the loop itself.

I can't get access to the websocket client outside of the loop so I can't create the tasks outside of the loop. By the time I have access to both the loop and the tasks I can't gracefully stop the loop because I'm inside the loop.

@aaugustin
Copy link
Member Author

In the example when one task yields it restarts both tasks. This means any data being waited on in the slower task is lost, asyncio.sleeps are ignored, and so on.

This is really a question about multiplexing and synchronizing asyncio tasks, not about websockets.

You haven't really described what you're trying to do, but I think the following should work:

  • start a producer task
  • have it put messages into a queue
  • poll the queue instead of the producer -- cancelling and restarting this coroutine won't cause these problems.

I got it to work with the following which is more like how you'd do it with gevent.

Applying techniques for implicit async I/O with gevent to asyncio's explicit async I/O is unlikely to go well.

It would be simpler if websockets.connect() could be used synchronously.

connect() is a a wrapper around asyncio's create_connection() which is a coroutine, so it has to be a coroutine. Since we can't go back in time and convince Guido van Rossum to design asyncio differently, I don't see what I can make from this.

I think it's trying to be a bit too magic and made things harder to figure out than they could have been.

If you can propose a simpler / less magic API, I'm interested. The goal of this library is to make working with websockets in asyncio as straightforward as possible.

@kylemacfarlane
Copy link

Using the examples in the docs how are you meant to stop the loop without spitting out pending task errors? Interrupts always seem to originate from run_forever/run_until_completed so any cleanup that needs to be done has to have access to everything needed at that point. But in the examples the websocket connection is only available inside the loop so how do you close it during cleanup outside of the loop?

Also the third example on the intro page ends with finally: yield from websocket.close(). This actually doesn't work and causes a RuntimeError as the loop will have been stopped by the interrupt. What needs to be done is for all tasks to be gathered and cancelled and then the loop started again for one final run.

In the end I came up with the following which works the same as the previous code but also stops gracefully with no pending task errors. However I still don't like how complicated it is to merely close the websocket.

@asyncio.coroutine
def open_websocket(url):
    websocket = yield from websockets.connect(url)
    return websocket

@asyncio.coroutine
def close_websocket(websocket):
    yield from websocket.close()

@asyncio.coroutine
def producer(websocket):
    while True:
        yield from websocket.send('Bringing you the latest news every 10 seconds!')
        yield from asyncio.sleep(10)

@asyncio.coroutine
def consumer(websocket):
    while True:
        msg = yield from websocket.recv()
        print('Received immediate update:', msg)


loop = asyncio.get_event_loop()

task = asyncio.async(open_websocket('wss://example.com'))
loop.run_until_complete(task)
websocket = task.result()

tasks = asyncio.gather(
    asyncio.async(producer(websocket)),
    asyncio.async(consumer(websocket))
)

try:
    loop.run_until_complete(tasks)
except KeyboardInterrupt:
    tasks.cancel()
    loop.run_forever()
    tasks.exception()
finally:
    loop.run_until_complete(asyncio.async(close_websocket(websocket)))
    loop.close()

If you can propose a simpler / less magic API, I'm interested. The goal of this library is to make working with websockets in asyncio as straightforward as possible.

I think when people are looking for a "Python websocket client" they are wanting something similar to the Javascript client. That means simply registering callbacks and not having to use queues or anything like that.

@aaugustin
Copy link
Member Author

aaugustin commented May 22, 2016

Using the examples in the docs how are you meant to stop the loop without spitting out pending task errors?

Usually pending tasks being cancelled isn't a problem if you're shutting down the connection anyway. If they are, just catch CancelledError in the tasks and handle it.

But in the examples the websocket connection is only available inside the loop so how do you close it during cleanup outside of the loop?

As documented, websockets takes care of closing the connection so you don't have to. This happens when a handler terminates or when you shut down the server.

EDIT: oops I mixed up with what happens on the server side — it's a common question. On the client side, for your use case, I think you can do this:

async def websocket_client():
    websocket = await websockets.connect('ws://...')
    try:
        # do stuff
    except asyncio.CancelledError:
        yield from websockets.close()
        raise

or this:

async def websocket_client():
    async with websockets.connect('ws://localhost:8765') as websocket:
        # do stuff

Also the third example on the intro page ends with finally: yield from websocket.close(). This actually doesn't work and causes a RuntimeError as the loop will have been stopped by the interrupt. What needs to be done is for all tasks to be gathered and cancelled and then the loop started again for one final run.

In the end I came up with the following which works the same as the previous code but also stops gracefully with no pending task errors. However I still don't like how complicated it is to merely close the websocket.

I think you're writing more complicated code than what's actually needed because you aren't very familiar with asyncio yet and you end up fighting it instead of taking advantage of it.

You should run three tasks in parallel:

  • the consumer
  • the producer
  • the websocket client, where you can use try/finally to close the connection.

I think when people are looking for a "Python websocket client" they are wanting something similar to the Javascript client. That means simply registering callbacks and not having to use queues or anything like that.

If you prefer callback-based programming, you should stop using asyncio and websockets. The point of asyncio is to provide coroutine-based async I/O handling instead of callback-based. The point of websockets is to provide a coroutine-based API.

So I'm afraid you ended up in the wrong place! Perhaps Twisted or Tornado would work better for you.

As far as I'm concerned, I won't exchange coroutines for callbacks, exactly for the same reason I won't exchange functions for gotos. But I'm aware there's a learning curve (or, more precisely, an unlearning curve).

@kylemacfarlane
Copy link

Usually pending tasks being cancelled isn't a problem if you're shutting down the connection anyway.

It doesn't matter in terms of what the code is going to do but it does make a real mess of the logs.

I found that registering signal handlers makes things behave a lot nicer. In the end I came up with the following:

@asyncio.coroutine
def main_loop(loop):
    websocket = yield from websockets.connect('wss://example.com')

    tasks = asyncio.gather(
        asyncio.async(producer(websocket)),
        asyncio.async(consumer(websocket))
    )

    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, tasks.cancel)

    try:
        yield from asyncio.wait((tasks,), loop=loop)
    finally:
        # Exceptions make GatheringFuture think it's done so to truly cancel
        # all tasks we need to access the real children
        for task in tasks._children:
            task.cancel()
        yield from websocket.close()
        return tasks.exception()

loop = asyncio.get_event_loop()
e = loop.run_until_complete(main_loop(loop))
# e is the exception (if any) that stopped the loop. You can log it or if it's
# something like ConnectionClosed you can start a new loop to reconnect
print(e)
loop.close()

I've found that this code will cleanly shut down whether that be caused by an interrupt or an exception.

@byxor
Copy link

byxor commented Nov 23, 2016

Thank you @kylemacfarlane for asking these questions here. And thank you @aaugustin for answering them so well.

This comment section has saved me from certain disaster. I've been fighting with asyncio for many hours of every day of the last 3 weeks. This solves almost all of my problems.

@aaugustin aaugustin mentioned this issue Jun 23, 2019
18 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants