forked from danielgtaylor/python-betterproto
-
Notifications
You must be signed in to change notification settings - Fork 0
Client streaming tests #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
nat-n
merged 5 commits into
nat-n:client-streaming
from
boukeversteegh:client-streaming-tests
Jun 16, 2020
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
159c30d
Fix close not awaitable, fix done is callable, fix return async next …
boukeversteegh f7aa615
Add test-cases for client stream-stream
boukeversteegh 0814729
Add cases for send()
boukeversteegh 50bb67b
Fix bugs and remove footgun feature in AsyncChannel
nat-n 4e78fe9
Merge branch 'client-streaming' into client-streaming-tests
nat-n File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
import asyncio | ||
from dataclasses import dataclass | ||
from typing import AsyncIterator | ||
|
||
import pytest | ||
|
||
import betterproto | ||
from betterproto.grpc.util.async_channel import AsyncChannel | ||
|
||
|
||
@dataclass | ||
class Message(betterproto.Message): | ||
body: str = betterproto.string_field(1) | ||
|
||
|
||
@pytest.fixture | ||
def expected_responses(): | ||
return [Message("Hello world 1"), Message("Hello world 2"), Message("Done")] | ||
|
||
|
||
class ClientStub: | ||
async def connect(self, requests: AsyncIterator): | ||
await asyncio.sleep(0.1) | ||
async for request in requests: | ||
await asyncio.sleep(0.1) | ||
yield request | ||
await asyncio.sleep(0.1) | ||
yield Message("Done") | ||
|
||
|
||
async def to_list(generator: AsyncIterator): | ||
lis = [] | ||
async for value in generator: | ||
lis.append(value) | ||
return lis | ||
|
||
|
||
@pytest.fixture | ||
def client(): | ||
# channel = Channel(host='127.0.0.1', port=50051) | ||
# return ClientStub(channel) | ||
return ClientStub() | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_from_list_close_automatically(client, expected_responses): | ||
requests = AsyncChannel( | ||
[Message(body="Hello world 1"), Message(body="Hello world 2")], close=True | ||
) | ||
|
||
responses = client.connect(requests) | ||
|
||
assert await to_list(responses) == expected_responses | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_from_list_close_manually_immediately(client, expected_responses): | ||
requests = AsyncChannel( | ||
[Message(body="Hello world 1"), Message(body="Hello world 2")], close=False | ||
) | ||
|
||
requests.close() | ||
|
||
responses = client.connect(requests) | ||
|
||
assert await to_list(responses) == expected_responses | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_from_list_close_manually_after_connect(client, expected_responses): | ||
requests = AsyncChannel( | ||
[Message(body="Hello world 1"), Message(body="Hello world 2")], close=False | ||
) | ||
|
||
responses = client.connect(requests) | ||
|
||
requests.close() | ||
|
||
assert await to_list(responses) == expected_responses | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_send_from_before_connect_and_close_automatically( | ||
client, expected_responses | ||
): | ||
requests = AsyncChannel() | ||
|
||
await requests.send_from( | ||
[Message(body="Hello world 1"), Message(body="Hello world 2")], close=True | ||
) | ||
|
||
responses = client.connect(requests) | ||
|
||
assert await to_list(responses) == expected_responses | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_send_from_after_connect_and_close_automatically( | ||
client, expected_responses | ||
): | ||
requests = AsyncChannel() | ||
|
||
responses = client.connect(requests) | ||
|
||
await requests.send_from( | ||
[Message(body="Hello world 1"), Message(body="Hello world 2")], close=True | ||
) | ||
|
||
assert await to_list(responses) == expected_responses | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_send_from_close_manually_immediately(client, expected_responses): | ||
requests = AsyncChannel() | ||
|
||
responses = client.connect(requests) | ||
|
||
await requests.send_from( | ||
[Message(body="Hello world 1"), Message(body="Hello world 2")], close=False | ||
) | ||
|
||
requests.close() | ||
|
||
assert await to_list(responses) == expected_responses | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_send_individually_and_close_before_connect(client, expected_responses): | ||
requests = AsyncChannel() | ||
|
||
await requests.send(Message(body="Hello world 1")) | ||
await requests.send(Message(body="Hello world 2")) | ||
requests.close() | ||
|
||
responses = client.connect(requests) | ||
|
||
assert await to_list(responses) == expected_responses | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_send_individually_and_close_after_connect(client, expected_responses): | ||
requests = AsyncChannel() | ||
|
||
await requests.send(Message(body="Hello world 1")) | ||
await requests.send(Message(body="Hello world 2")) | ||
|
||
responses = client.connect(requests) | ||
|
||
requests.close() | ||
|
||
assert await to_list(responses) == expected_responses |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically this is an incorrect usage of the API, (though it could be the API that's wrong...).
It looks like the sending task is canceled on the call to close, having not been given much chance to complete. and so to_list(responses) doesn't get all the responses because the request was cut short.
Now I can see how this is surprising, but it's not obvious whether the solution to this is a design change or just some documentation of the kind of sharp edge that is common with async/await.
It's not possible to await on a construction of the channel, and the intention is that the calling coro can continue while requests are sent. So the user either needs to accept that this can happen with items provided to the constructor or there needs to be a way to await the initial sending task, like
await requests.wait()
. The close method can't have this role IMO because I think it's important to be able to close the channel synchronously.Maybe it's not worth being able to construct the channel with an initial source of items to send 😞, which is a pity cos it's slick.
Maybe we could get comparable convenience by adding support for chaining methods so:
This makes some use cases less convenient like:
But probably the simplicity and removal of a sharp edge is worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I didn't realize or expect that the sending was in anyway influenced by close. I thought it was just to mark that the stream will have no more data.
That is something you really need, otherwise the connection will stay open forever. But I don't think it should cancel unsent messages.
I experimented a bit with just using a vanilla asyncio.queues.Queue, but quickly found you do need some way to convert it to an async generator (at least for the client as you've proposed it, which i think is good). So actually, if you just have a way to end the stream, and to async generate, it's all you need.
Test scenario:
This little class does the job for me.
If I want to send from another async generator, I don't need this class, I can just pass the generator directly as an argument to the client.
If I want to send from a list, I can do the same. If I want some complex combination of both, I can call
put
manually in a loop or asynchronously and it will work.On its own AsyncChannel is a good looking and reusable abstraction, but in the context of a client that already accepts lists and async generators, I think its use will be limited.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think sending has to cancel unsent messages, because theres' no guarantee that the number of unsent messages might be large or indefinite.
That's clever. And I'd probably prefer that kind of solution for its simplicity within the scope of a single app. But I think a library should have more clearly defined abstractions, and work as expected with a broader range of edge cases. A channel is conceptually a better fit than a queue for an object with the required set of basic operations: FIFO add/remove, concurrent iteration, and close. It's less code to implement it as a subtype of Queue, but I think naming/semantics/encapsulation are important too, and so abstracting the actual queue as an implementation detail is worthwhile.
I also think send_from is an important enough use case to support as first class, rather than requiring the user to write async for loops to connect things together. I image if I were using this pattern a few times within a project I'd end up writing a pipe_this_iterable_into_that_channel helper function and stashing it in a utils module somewhere, which is a pity when the library can support it natively without any obvious compromise to the scope or design.