-
Notifications
You must be signed in to change notification settings - Fork 0
Client streaming tests #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Client streaming tests #1
Conversation
|
||
requests.close() | ||
|
||
assert await to_list(responses) == expected_responses |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically this is an incorrect usage of the API, (though it could be the API that's wrong...).
It looks like the sending task is canceled on the call to close, having not been given much chance to complete. and so to_list(responses) doesn't get all the responses because the request was cut short.
Now I can see how this is surprising, but it's not obvious whether the solution to this is a design change or just some documentation of the kind of sharp edge that is common with async/await.
It's not possible to await on a construction of the channel, and the intention is that the calling coro can continue while requests are sent. So the user either needs to accept that this can happen with items provided to the constructor or there needs to be a way to await the initial sending task, like await requests.wait()
. The close method can't have this role IMO because I think it's important to be able to close the channel synchronously.
Maybe it's not worth being able to construct the channel with an initial source of items to send 😞, which is a pity cos it's slick.
Maybe we could get comparable convenience by adding support for chaining methods so:
requests = await AsyncChannel().send_from([...])
...
requests.close()
This makes some use cases less convenient like:
requests = await AsyncChannel()
sending_task = asyncio.ensure_future(requests.send_from([...]))
... # get some responses
await sending_task
requests.close()
But probably the simplicity and removal of a sharp edge is worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I didn't realize or expect that the sending was in anyway influenced by close. I thought it was just to mark that the stream will have no more data.
That is something you really need, otherwise the connection will stay open forever. But I don't think it should cancel unsent messages.
I experimented a bit with just using a vanilla asyncio.queues.Queue, but quickly found you do need some way to convert it to an async generator (at least for the client as you've proposed it, which i think is good). So actually, if you just have a way to end the stream, and to async generate, it's all you need.
class MessageQueue(asyncio.queues.Queue):
__close = object()
async def close(self):
await self.put(self.__close)
def close_nowait(self):
self.put_nowait(self.__close)
async def __aiter__(self):
while True:
element = await self.get()
if element is self.__close:
self.task_done()
break
yield element
self.task_done()
Test scenario:
@pytest.mark.asyncio
async def test_send_from_async_queue(client, expected_responses):
requests = MessageQueue()
await requests.put(Message(body="Hello world 1"))
await requests.put(Message(body="Hello world 2"))
responses = client.connect(requests)
await requests.close()
assert await to_list(responses) == expected_responses
This little class does the job for me.
If I want to send from another async generator, I don't need this class, I can just pass the generator directly as an argument to the client.
If I want to send from a list, I can do the same. If I want some complex combination of both, I can call put
manually in a loop or asynchronously and it will work.
On its own AsyncChannel is a good looking and reusable abstraction, but in the context of a client that already accepts lists and async generators, I think its use will be limited.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think sending has to cancel unsent messages, because theres' no guarantee that the number of unsent messages might be large or indefinite.
That's clever. And I'd probably prefer that kind of solution for its simplicity within the scope of a single app. But I think a library should have more clearly defined abstractions, and work as expected with a broader range of edge cases. A channel is conceptually a better fit than a queue for an object with the required set of basic operations: FIFO add/remove, concurrent iteration, and close. It's less code to implement it as a subtype of Queue, but I think naming/semantics/encapsulation are important too, and so abstracting the actual queue as an implementation detail is worthwhile.
I also think send_from is an important enough use case to support as first class, rather than requiring the user to write async for loops to connect things together. I image if I were using this pattern a few times within a project I'd end up writing a pipe_this_iterable_into_that_channel helper function and stashing it in a utils module somewhere, which is a pity when the library can support it natively without any obvious compromise to the scope or design.
d6a7427
to
e1ccd54
Compare
I spent today getting intimate with the Client Stream/Stream connections.
This PR contains some changes to fix some basic use-cases.
However, I could not fix all scenarios that I thought were intended to be supported.
I've added a test file that I have run against a real gRPC server (it returns all messages verbatim, plus another 'Done' message at the end).
In the end I found that its possible to run the same tests with a fake client that is simply an async generator, but I recommend testing this with an actual server to be sure we have tested this feature end-to-end before release.